001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.ArrayList;
020    import java.util.HashMap;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Set;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.locks.ReentrantReadWriteLock;
027    
028    import javax.jms.JMSException;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.broker.ConsumerBrokerExchange;
031    import org.apache.activemq.broker.DestinationAlreadyExistsException;
032    import org.apache.activemq.broker.ProducerBrokerExchange;
033    import org.apache.activemq.broker.TransportConnection;
034    import org.apache.activemq.command.ActiveMQDestination;
035    import org.apache.activemq.command.ConsumerControl;
036    import org.apache.activemq.command.ConsumerId;
037    import org.apache.activemq.command.ConsumerInfo;
038    import org.apache.activemq.command.DestinationInfo;
039    import org.apache.activemq.command.Message;
040    import org.apache.activemq.command.MessageAck;
041    import org.apache.activemq.command.MessageDispatchNotification;
042    import org.apache.activemq.command.MessagePull;
043    import org.apache.activemq.command.ProducerInfo;
044    import org.apache.activemq.command.RemoveSubscriptionInfo;
045    import org.apache.activemq.command.Response;
046    import org.apache.activemq.filter.DestinationFilter;
047    import org.apache.activemq.filter.DestinationMap;
048    import org.apache.activemq.security.SecurityContext;
049    import org.apache.activemq.thread.TaskRunnerFactory;
050    import org.apache.activemq.usage.SystemUsage;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    /**
055     *
056     */
057    public abstract class AbstractRegion implements Region {
058    
059        private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class);
060    
061        protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
062        protected final DestinationMap destinationMap = new DestinationMap();
063        protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
064        protected final SystemUsage usageManager;
065        protected final DestinationFactory destinationFactory;
066        protected final DestinationStatistics destinationStatistics;
067        protected final RegionBroker broker;
068        protected boolean autoCreateDestinations = true;
069        protected final TaskRunnerFactory taskRunnerFactory;
070        protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock();
071        protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
072        protected boolean started;
073    
074        public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager,
075                TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
076            if (broker == null) {
077                throw new IllegalArgumentException("null broker");
078            }
079            this.broker = broker;
080            this.destinationStatistics = destinationStatistics;
081            this.usageManager = memoryManager;
082            this.taskRunnerFactory = taskRunnerFactory;
083            if (destinationFactory == null) {
084                throw new IllegalArgumentException("null destinationFactory");
085            }
086            this.destinationFactory = destinationFactory;
087        }
088    
089        public final void start() throws Exception {
090            started = true;
091    
092            Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
093            for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
094                ActiveMQDestination dest = iter.next();
095    
096                ConnectionContext context = new ConnectionContext();
097                context.setBroker(broker.getBrokerService().getBroker());
098                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
099                context.getBroker().addDestination(context, dest, false);
100            }
101            destinationsLock.readLock().lock();
102            try{
103                for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
104                    Destination dest = i.next();
105                    dest.start();
106                }
107            } finally {
108                destinationsLock.readLock().unlock();
109            }
110        }
111    
112        public void stop() throws Exception {
113            started = false;
114            destinationsLock.readLock().lock();
115            try{
116                for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
117                    Destination dest = i.next();
118                    dest.stop();
119                }
120            } finally {
121                destinationsLock.readLock().unlock();
122            }
123            destinations.clear();
124        }
125    
126        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
127                boolean createIfTemporary) throws Exception {
128    
129            destinationsLock.writeLock().lock();
130            try {
131                Destination dest = destinations.get(destination);
132                if (dest == null) {
133                    if (destination.isTemporary() == false || createIfTemporary) {
134                        if (LOG.isDebugEnabled()) {
135                            LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
136                        }
137                        dest = createDestination(context, destination);
138                        // intercept if there is a valid interceptor defined
139                        DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
140                        if (destinationInterceptor != null) {
141                            dest = destinationInterceptor.intercept(dest);
142                        }
143                        dest.start();
144                        destinations.put(destination, dest);
145                        destinationMap.put(destination, dest);
146                        addSubscriptionsForDestination(context, dest);
147                    }
148                    if (dest == null) {
149                        throw new JMSException("The destination " + destination + " does not exist.");
150                    }
151                }
152                return dest;
153            } finally {
154                destinationsLock.writeLock().unlock();
155            }
156        }
157    
158        public Map<ConsumerId, Subscription> getSubscriptions() {
159            return subscriptions;
160        }
161    
162        protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
163                throws Exception {
164    
165            List<Subscription> rc = new ArrayList<Subscription>();
166            // Add all consumers that are interested in the destination.
167            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
168                Subscription sub = iter.next();
169                if (sub.matches(dest.getActiveMQDestination())) {
170                    dest.addSubscription(context, sub);
171                    rc.add(sub);
172                }
173            }
174            return rc;
175    
176        }
177    
178        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
179                throws Exception {
180    
181            // No timeout.. then try to shut down right way, fails if there are
182            // current subscribers.
183            if (timeout == 0) {
184                for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
185                    Subscription sub = iter.next();
186                    if (sub.matches(destination)) {
187                        throw new JMSException("Destination still has an active subscription: " + destination);
188                    }
189                }
190            }
191    
192            if (timeout > 0) {
193                // TODO: implement a way to notify the subscribers that we want to
194                // take the down
195                // the destination and that they should un-subscribe.. Then wait up
196                // to timeout time before
197                // dropping the subscription.
198            }
199    
200            if (LOG.isDebugEnabled()) {
201                LOG.debug(broker.getBrokerName() + " removing destination: " + destination);
202            }
203    
204            destinationsLock.writeLock().lock();
205            try {
206                Destination dest = destinations.remove(destination);
207                if (dest != null) {
208                    // timeout<0 or we timed out, we now force any remaining
209                    // subscriptions to un-subscribe.
210                    for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
211                        Subscription sub = iter.next();
212                        if (sub.matches(destination)) {
213                            dest.removeSubscription(context, sub, 0l);
214                        }
215                    }
216                    destinationMap.removeAll(destination);
217                    dispose(context, dest);
218                    DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
219                    if (destinationInterceptor != null) {
220                        destinationInterceptor.remove(dest);
221                    }
222    
223                } else {
224                    if (LOG.isDebugEnabled()) {
225                        LOG.debug("Cannot remove a destination that doesn't exist: " + destination);
226                    }
227                }
228            } finally {
229                destinationsLock.writeLock().unlock();
230            }
231        }
232    
233        /**
234         * Provide an exact or wildcard lookup of destinations in the region
235         *
236         * @return a set of matching destination objects.
237         */
238        @SuppressWarnings("unchecked")
239        public Set<Destination> getDestinations(ActiveMQDestination destination) {
240            destinationsLock.readLock().lock();
241            try{
242                return destinationMap.get(destination);
243            } finally {
244                destinationsLock.readLock().unlock();
245            }
246        }
247    
248        public Map<ActiveMQDestination, Destination> getDestinationMap() {
249            destinationsLock.readLock().lock();
250            try{
251                return destinations;
252            } finally {
253                destinationsLock.readLock().unlock();
254            }
255        }
256    
257        @SuppressWarnings("unchecked")
258        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
259            if (LOG.isDebugEnabled()) {
260                LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
261                        + info.getDestination());
262            }
263            ActiveMQDestination destination = info.getDestination();
264            if (destination != null && !destination.isPattern() && !destination.isComposite()) {
265                // lets auto-create the destination
266                lookup(context, destination,true);
267            }
268    
269            Object addGuard;
270            synchronized (consumerChangeMutexMap) {
271                addGuard = consumerChangeMutexMap.get(info.getConsumerId());
272                if (addGuard == null) {
273                    addGuard = new Object();
274                    consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
275                }
276            }
277            synchronized (addGuard) {
278                Subscription o = subscriptions.get(info.getConsumerId());
279                if (o != null) {
280                    LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
281                    return o;
282                }
283    
284                // We may need to add some destinations that are in persistent store
285                // but not active
286                // in the broker.
287                //
288                // TODO: think about this a little more. This is good cause
289                // destinations are not loaded into
290                // memory until a client needs to use the queue, but a management
291                // agent viewing the
292                // broker will not see a destination that exists in persistent
293                // store. We may want to
294                // eagerly load all destinations into the broker but have an
295                // inactive state for the
296                // destination which has reduced memory usage.
297                //
298                DestinationFilter.parseFilter(info.getDestination());
299    
300                Subscription sub = createSubscription(context, info);
301    
302                subscriptions.put(info.getConsumerId(), sub);
303    
304                // At this point we're done directly manipulating subscriptions,
305                // but we need to retain the synchronized block here. Consider
306                // otherwise what would happen if at this point a second
307                // thread added, then removed, as would be allowed with
308                // no mutex held. Remove is only essentially run once
309                // so everything after this point would be leaked.
310    
311                // Add the subscription to all the matching queues.
312                // But copy the matches first - to prevent deadlocks
313                List<Destination> addList = new ArrayList<Destination>();
314                destinationsLock.readLock().lock();
315                try {
316                    for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
317                        addList.add(dest);
318                    }
319                } finally {
320                    destinationsLock.readLock().unlock();
321                }
322    
323                for (Destination dest : addList) {
324                    dest.addSubscription(context, sub);
325                }
326    
327                if (info.isBrowser()) {
328                    ((QueueBrowserSubscription) sub).destinationsAdded();
329                }
330    
331                return sub;
332            }
333        }
334    
335        /**
336         * Get all the Destinations that are in storage
337         *
338         * @return Set of all stored destinations
339         */
340        @SuppressWarnings("rawtypes")
341        public Set getDurableDestinations() {
342            return destinationFactory.getDestinations();
343        }
344    
345        /**
346         * @return all Destinations that don't have active consumers
347         */
348        protected Set<ActiveMQDestination> getInactiveDestinations() {
349            Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
350            destinationsLock.readLock().lock();
351            try {
352                inactiveDests.removeAll(destinations.keySet());
353            } finally {
354                destinationsLock.readLock().unlock();
355            }
356            return inactiveDests;
357        }
358    
359        @SuppressWarnings("unchecked")
360        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
361            if (LOG.isDebugEnabled()) {
362                LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
363                        + info.getDestination());
364            }
365    
366            Subscription sub = subscriptions.remove(info.getConsumerId());
367            // The sub could be removed elsewhere - see ConnectionSplitBroker
368            if (sub != null) {
369    
370                // remove the subscription from all the matching queues.
371                List<Destination> removeList = new ArrayList<Destination>();
372                destinationsLock.readLock().lock();
373                try {
374                    for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
375                        removeList.add(dest);
376                    }
377                } finally {
378                    destinationsLock.readLock().unlock();
379                }
380                for (Destination dest : removeList) {
381                    dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
382                }
383    
384                destroySubscription(sub);
385            }
386            synchronized (consumerChangeMutexMap) {
387                consumerChangeMutexMap.remove(info.getConsumerId());
388            }
389        }
390    
391        protected void destroySubscription(Subscription sub) {
392            sub.destroy();
393        }
394    
395        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
396            throw new JMSException("Invalid operation.");
397        }
398    
399        public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
400            final ConnectionContext context = producerExchange.getConnectionContext();
401    
402            if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
403                final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
404                producerExchange.setRegionDestination(regionDestination);
405            }
406    
407            producerExchange.getRegionDestination().send(producerExchange, messageSend);
408        }
409    
410        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
411            Subscription sub = consumerExchange.getSubscription();
412            if (sub == null) {
413                sub = subscriptions.get(ack.getConsumerId());
414                if (sub == null) {
415                    if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
416                        LOG.warn("Ack for non existent subscription, ack:" + ack);
417                        throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
418                    } else {
419                        if (LOG.isDebugEnabled()) {
420                            LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
421                        }
422                        return;
423                    }
424                }
425                consumerExchange.setSubscription(sub);
426            }
427            sub.acknowledge(consumerExchange.getConnectionContext(), ack);
428        }
429    
430        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
431            Subscription sub = subscriptions.get(pull.getConsumerId());
432            if (sub == null) {
433                throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
434            }
435            return sub.pullMessage(context, pull);
436        }
437    
438        protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
439            Destination dest = null;
440    
441            destinationsLock.readLock().lock();
442            try {
443                dest = destinations.get(destination);
444            } finally {
445                destinationsLock.readLock().unlock();
446            }
447    
448            if (dest == null) {
449                if (isAutoCreateDestinations()) {
450                    // Try to auto create the destination... re-invoke broker
451                    // from the
452                    // top so that the proper security checks are performed.
453                    try {
454                        context.getBroker().addDestination(context, destination, createTemporary);
455                        dest = addDestination(context, destination, false);
456                    } catch (DestinationAlreadyExistsException e) {
457                        // if the destination already exists then lets ignore
458                        // this error
459                    }
460                    // We should now have the dest created.
461                    destinationsLock.readLock().lock();
462                    try {
463                        dest = destinations.get(destination);
464                    } finally {
465                        destinationsLock.readLock().unlock();
466                    }
467                }
468    
469                if (dest == null) {
470                    throw new JMSException("The destination " + destination + " does not exist.");
471                }
472            }
473            return dest;
474        }
475    
476        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
477            Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
478            if (sub != null) {
479                sub.processMessageDispatchNotification(messageDispatchNotification);
480            } else {
481                throw new JMSException("Slave broker out of sync with master - Subscription: "
482                        + messageDispatchNotification.getConsumerId() + " on "
483                        + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: "
484                        + messageDispatchNotification.getMessageId());
485            }
486        }
487    
488        /*
489         * For a Queue/TempQueue, dispatch order is imperative to match acks, so the
490         * dispatch is deferred till the notification to ensure that the
491         * subscription chosen by the master is used. AMQ-2102
492         */
493        protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
494                throws Exception {
495            Destination dest = null;
496            destinationsLock.readLock().lock();
497            try {
498                dest = destinations.get(messageDispatchNotification.getDestination());
499            } finally {
500                destinationsLock.readLock().unlock();
501            }
502    
503            if (dest != null) {
504                dest.processDispatchNotification(messageDispatchNotification);
505            } else {
506                throw new JMSException("Slave broker out of sync with master - Destination: "
507                        + messageDispatchNotification.getDestination() + " does not exist for consumer "
508                        + messageDispatchNotification.getConsumerId() + " with message: "
509                        + messageDispatchNotification.getMessageId());
510            }
511        }
512    
513        public void gc() {
514            for (Subscription sub : subscriptions.values()) {
515                sub.gc();
516            }
517    
518            destinationsLock.readLock().lock();
519            try {
520                for (Destination dest : destinations.values()) {
521                    dest.gc();
522                }
523            } finally {
524                destinationsLock.readLock().unlock();
525            }
526        }
527    
528        protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
529    
530        protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination)
531                throws Exception {
532            return destinationFactory.createDestination(context, destination, destinationStatistics);
533        }
534    
535        public boolean isAutoCreateDestinations() {
536            return autoCreateDestinations;
537        }
538    
539        public void setAutoCreateDestinations(boolean autoCreateDestinations) {
540            this.autoCreateDestinations = autoCreateDestinations;
541        }
542    
543        @SuppressWarnings("unchecked")
544        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
545            destinationsLock.readLock().lock();
546            try {
547                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
548                    dest.addProducer(context, info);
549                }
550            } finally {
551                destinationsLock.readLock().unlock();
552            }
553        }
554    
555        /**
556         * Removes a Producer.
557         *
558         * @param context
559         *            the environment the operation is being executed under.
560         * @throws Exception
561         *             TODO
562         */
563        @SuppressWarnings("unchecked")
564        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
565            destinationsLock.readLock().lock();
566            try {
567                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
568                    dest.removeProducer(context, info);
569                }
570            } finally {
571                destinationsLock.readLock().unlock();
572            }
573        }
574    
575        protected void dispose(ConnectionContext context, Destination dest) throws Exception {
576            dest.dispose(context);
577            dest.stop();
578            destinationFactory.removeDestination(dest);
579        }
580    
581        public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
582            Subscription sub = subscriptions.get(control.getConsumerId());
583            if (sub != null && sub instanceof AbstractSubscription) {
584                ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
585                if (LOG.isDebugEnabled()) {
586                    LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: "
587                            + control.getConsumerId());
588                }
589                try {
590                    lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
591                } catch (Exception e) {
592                    LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e);
593                }
594            }
595        }
596    }