001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.ArrayList;
020    import java.util.HashMap;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Set;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.locks.ReentrantReadWriteLock;
027    
028    import javax.jms.JMSException;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.broker.ConsumerBrokerExchange;
031    import org.apache.activemq.DestinationDoesNotExistException;
032    import org.apache.activemq.broker.ProducerBrokerExchange;
033    import org.apache.activemq.broker.region.policy.PolicyEntry;
034    import org.apache.activemq.command.ActiveMQDestination;
035    import org.apache.activemq.command.ConsumerControl;
036    import org.apache.activemq.command.ConsumerId;
037    import org.apache.activemq.command.ConsumerInfo;
038    import org.apache.activemq.command.Message;
039    import org.apache.activemq.command.MessageAck;
040    import org.apache.activemq.command.MessageDispatchNotification;
041    import org.apache.activemq.command.MessagePull;
042    import org.apache.activemq.command.ProducerInfo;
043    import org.apache.activemq.command.RemoveSubscriptionInfo;
044    import org.apache.activemq.command.Response;
045    import org.apache.activemq.filter.DestinationFilter;
046    import org.apache.activemq.filter.DestinationMap;
047    import org.apache.activemq.security.SecurityContext;
048    import org.apache.activemq.thread.TaskRunnerFactory;
049    import org.apache.activemq.usage.SystemUsage;
050    import org.slf4j.Logger;
051    import org.slf4j.LoggerFactory;
052    
053    /**
054     *
055     */
056    public abstract class AbstractRegion implements Region {
057    
058        private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class);
059    
060        protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
061        protected final DestinationMap destinationMap = new DestinationMap();
062        protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
063        protected final SystemUsage usageManager;
064        protected final DestinationFactory destinationFactory;
065        protected final DestinationStatistics destinationStatistics;
066        protected final RegionBroker broker;
067        protected boolean autoCreateDestinations = true;
068        protected final TaskRunnerFactory taskRunnerFactory;
069        protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock();
070        protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
071        protected boolean started;
072    
073        public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager,
074                TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
075            if (broker == null) {
076                throw new IllegalArgumentException("null broker");
077            }
078            this.broker = broker;
079            this.destinationStatistics = destinationStatistics;
080            this.usageManager = memoryManager;
081            this.taskRunnerFactory = taskRunnerFactory;
082            if (destinationFactory == null) {
083                throw new IllegalArgumentException("null destinationFactory");
084            }
085            this.destinationFactory = destinationFactory;
086        }
087    
088        public final void start() throws Exception {
089            started = true;
090    
091            Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
092            for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
093                ActiveMQDestination dest = iter.next();
094    
095                ConnectionContext context = new ConnectionContext();
096                context.setBroker(broker.getBrokerService().getBroker());
097                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
098                context.getBroker().addDestination(context, dest, false);
099            }
100            destinationsLock.readLock().lock();
101            try{
102                for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
103                    Destination dest = i.next();
104                    dest.start();
105                }
106            } finally {
107                destinationsLock.readLock().unlock();
108            }
109        }
110    
111        public void stop() throws Exception {
112            started = false;
113            destinationsLock.readLock().lock();
114            try{
115                for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
116                    Destination dest = i.next();
117                    dest.stop();
118                }
119            } finally {
120                destinationsLock.readLock().unlock();
121            }
122            destinations.clear();
123        }
124    
125        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
126                boolean createIfTemporary) throws Exception {
127    
128            destinationsLock.writeLock().lock();
129            try {
130                Destination dest = destinations.get(destination);
131                if (dest == null) {
132                    if (destination.isTemporary() == false || createIfTemporary) {
133                        if (LOG.isDebugEnabled()) {
134                            LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
135                        }
136                        dest = createDestination(context, destination);
137                        // intercept if there is a valid interceptor defined
138                        DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
139                        if (destinationInterceptor != null) {
140                            dest = destinationInterceptor.intercept(dest);
141                        }
142                        dest.start();
143                        destinations.put(destination, dest);
144                        destinationMap.put(destination, dest);
145                        addSubscriptionsForDestination(context, dest);
146                    }
147                    if (dest == null) {
148                        throw new DestinationDoesNotExistException(destination.getQualifiedName());
149                    }
150                }
151                return dest;
152            } finally {
153                destinationsLock.writeLock().unlock();
154            }
155        }
156    
157        public Map<ConsumerId, Subscription> getSubscriptions() {
158            return subscriptions;
159        }
160    
161        protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
162                throws Exception {
163    
164            List<Subscription> rc = new ArrayList<Subscription>();
165            // Add all consumers that are interested in the destination.
166            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
167                Subscription sub = iter.next();
168                if (sub.matches(dest.getActiveMQDestination())) {
169                    dest.addSubscription(context, sub);
170                    rc.add(sub);
171                }
172            }
173            return rc;
174    
175        }
176    
177        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
178                throws Exception {
179    
180            // No timeout.. then try to shut down right way, fails if there are
181            // current subscribers.
182            if (timeout == 0) {
183                for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
184                    Subscription sub = iter.next();
185                    if (sub.matches(destination)) {
186                        throw new JMSException("Destination still has an active subscription: " + destination);
187                    }
188                }
189            }
190    
191            if (timeout > 0) {
192                // TODO: implement a way to notify the subscribers that we want to
193                // take the down
194                // the destination and that they should un-subscribe.. Then wait up
195                // to timeout time before
196                // dropping the subscription.
197            }
198    
199            if (LOG.isDebugEnabled()) {
200                LOG.debug(broker.getBrokerName() + " removing destination: " + destination);
201            }
202    
203            destinationsLock.writeLock().lock();
204            try {
205                Destination dest = destinations.remove(destination);
206                if (dest != null) {
207                    // timeout<0 or we timed out, we now force any remaining
208                    // subscriptions to un-subscribe.
209                    for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
210                        Subscription sub = iter.next();
211                        if (sub.matches(destination)) {
212                            dest.removeSubscription(context, sub, 0l);
213                        }
214                    }
215                    destinationMap.removeAll(destination);
216                    dispose(context, dest);
217                    DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
218                    if (destinationInterceptor != null) {
219                        destinationInterceptor.remove(dest);
220                    }
221    
222                } else {
223                    if (LOG.isDebugEnabled()) {
224                        LOG.debug("Cannot remove a destination that doesn't exist: " + destination);
225                    }
226                }
227            } finally {
228                destinationsLock.writeLock().unlock();
229            }
230        }
231    
232        /**
233         * Provide an exact or wildcard lookup of destinations in the region
234         *
235         * @return a set of matching destination objects.
236         */
237        @SuppressWarnings("unchecked")
238        public Set<Destination> getDestinations(ActiveMQDestination destination) {
239            destinationsLock.readLock().lock();
240            try{
241                return destinationMap.get(destination);
242            } finally {
243                destinationsLock.readLock().unlock();
244            }
245        }
246    
247        public Map<ActiveMQDestination, Destination> getDestinationMap() {
248            destinationsLock.readLock().lock();
249            try{
250                return destinations;
251            } finally {
252                destinationsLock.readLock().unlock();
253            }
254        }
255    
256        @SuppressWarnings("unchecked")
257        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
258            if (LOG.isDebugEnabled()) {
259                LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
260                        + info.getDestination());
261            }
262            ActiveMQDestination destination = info.getDestination();
263            if (destination != null && !destination.isPattern() && !destination.isComposite()) {
264                // lets auto-create the destination
265                lookup(context, destination,true);
266            }
267    
268            Object addGuard;
269            synchronized (consumerChangeMutexMap) {
270                addGuard = consumerChangeMutexMap.get(info.getConsumerId());
271                if (addGuard == null) {
272                    addGuard = new Object();
273                    consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
274                }
275            }
276            synchronized (addGuard) {
277                Subscription o = subscriptions.get(info.getConsumerId());
278                if (o != null) {
279                    LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
280                    return o;
281                }
282    
283                // We may need to add some destinations that are in persistent store
284                // but not active
285                // in the broker.
286                //
287                // TODO: think about this a little more. This is good cause
288                // destinations are not loaded into
289                // memory until a client needs to use the queue, but a management
290                // agent viewing the
291                // broker will not see a destination that exists in persistent
292                // store. We may want to
293                // eagerly load all destinations into the broker but have an
294                // inactive state for the
295                // destination which has reduced memory usage.
296                //
297                DestinationFilter.parseFilter(info.getDestination());
298    
299                Subscription sub = createSubscription(context, info);
300    
301                subscriptions.put(info.getConsumerId(), sub);
302    
303                // At this point we're done directly manipulating subscriptions,
304                // but we need to retain the synchronized block here. Consider
305                // otherwise what would happen if at this point a second
306                // thread added, then removed, as would be allowed with
307                // no mutex held. Remove is only essentially run once
308                // so everything after this point would be leaked.
309    
310                // Add the subscription to all the matching queues.
311                // But copy the matches first - to prevent deadlocks
312                List<Destination> addList = new ArrayList<Destination>();
313                destinationsLock.readLock().lock();
314                try {
315                    for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
316                        addList.add(dest);
317                    }
318                } finally {
319                    destinationsLock.readLock().unlock();
320                }
321    
322                for (Destination dest : addList) {
323                    dest.addSubscription(context, sub);
324                }
325    
326                if (info.isBrowser()) {
327                    ((QueueBrowserSubscription) sub).destinationsAdded();
328                }
329    
330                return sub;
331            }
332        }
333    
334        /**
335         * Get all the Destinations that are in storage
336         *
337         * @return Set of all stored destinations
338         */
339        @SuppressWarnings("rawtypes")
340        public Set getDurableDestinations() {
341            return destinationFactory.getDestinations();
342        }
343    
344        /**
345         * @return all Destinations that don't have active consumers
346         */
347        protected Set<ActiveMQDestination> getInactiveDestinations() {
348            Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
349            destinationsLock.readLock().lock();
350            try {
351                inactiveDests.removeAll(destinations.keySet());
352            } finally {
353                destinationsLock.readLock().unlock();
354            }
355            return inactiveDests;
356        }
357    
358        @SuppressWarnings("unchecked")
359        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
360            if (LOG.isDebugEnabled()) {
361                LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
362                        + info.getDestination());
363            }
364    
365            Subscription sub = subscriptions.remove(info.getConsumerId());
366            // The sub could be removed elsewhere - see ConnectionSplitBroker
367            if (sub != null) {
368    
369                // remove the subscription from all the matching queues.
370                List<Destination> removeList = new ArrayList<Destination>();
371                destinationsLock.readLock().lock();
372                try {
373                    for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
374                        removeList.add(dest);
375                    }
376                } finally {
377                    destinationsLock.readLock().unlock();
378                }
379                for (Destination dest : removeList) {
380                    dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
381                }
382    
383                destroySubscription(sub);
384            }
385            synchronized (consumerChangeMutexMap) {
386                consumerChangeMutexMap.remove(info.getConsumerId());
387            }
388        }
389    
390        protected void destroySubscription(Subscription sub) {
391            sub.destroy();
392        }
393    
394        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
395            throw new JMSException("Invalid operation.");
396        }
397    
398        public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
399            final ConnectionContext context = producerExchange.getConnectionContext();
400    
401            if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
402                final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
403                producerExchange.setRegionDestination(regionDestination);
404            }
405    
406            producerExchange.getRegionDestination().send(producerExchange, messageSend);
407        }
408    
409        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
410            Subscription sub = consumerExchange.getSubscription();
411            if (sub == null) {
412                sub = subscriptions.get(ack.getConsumerId());
413                if (sub == null) {
414                    if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
415                        LOG.warn("Ack for non existent subscription, ack:" + ack);
416                        throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
417                    } else {
418                        if (LOG.isDebugEnabled()) {
419                            LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
420                        }
421                        return;
422                    }
423                }
424                consumerExchange.setSubscription(sub);
425            }
426            sub.acknowledge(consumerExchange.getConnectionContext(), ack);
427        }
428    
429        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
430            Subscription sub = subscriptions.get(pull.getConsumerId());
431            if (sub == null) {
432                throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
433            }
434            return sub.pullMessage(context, pull);
435        }
436    
437        protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
438            Destination dest = null;
439    
440            destinationsLock.readLock().lock();
441            try {
442                dest = destinations.get(destination);
443            } finally {
444                destinationsLock.readLock().unlock();
445            }
446    
447            if (dest == null) {
448                if (isAutoCreateDestinations()) {
449                    // Try to auto create the destination... re-invoke broker
450                    // from the
451                    // top so that the proper security checks are performed.
452                    context.getBroker().addDestination(context, destination, createTemporary);
453                    dest = addDestination(context, destination, false);
454                    // We should now have the dest created.
455                    destinationsLock.readLock().lock();
456                    try {
457                        dest = destinations.get(destination);
458                    } finally {
459                        destinationsLock.readLock().unlock();
460                    }
461                }
462    
463                if (dest == null) {
464                    throw new JMSException("The destination " + destination + " does not exist.");
465                }
466            }
467            return dest;
468        }
469    
470        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
471            Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
472            if (sub != null) {
473                sub.processMessageDispatchNotification(messageDispatchNotification);
474            } else {
475                throw new JMSException("Slave broker out of sync with master - Subscription: "
476                        + messageDispatchNotification.getConsumerId() + " on "
477                        + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: "
478                        + messageDispatchNotification.getMessageId());
479            }
480        }
481    
482        /*
483         * For a Queue/TempQueue, dispatch order is imperative to match acks, so the
484         * dispatch is deferred till the notification to ensure that the
485         * subscription chosen by the master is used. AMQ-2102
486         */
487        protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
488                throws Exception {
489            Destination dest = null;
490            destinationsLock.readLock().lock();
491            try {
492                dest = destinations.get(messageDispatchNotification.getDestination());
493            } finally {
494                destinationsLock.readLock().unlock();
495            }
496    
497            if (dest != null) {
498                dest.processDispatchNotification(messageDispatchNotification);
499            } else {
500                throw new JMSException("Slave broker out of sync with master - Destination: "
501                        + messageDispatchNotification.getDestination() + " does not exist for consumer "
502                        + messageDispatchNotification.getConsumerId() + " with message: "
503                        + messageDispatchNotification.getMessageId());
504            }
505        }
506    
507        public void gc() {
508            for (Subscription sub : subscriptions.values()) {
509                sub.gc();
510            }
511    
512            destinationsLock.readLock().lock();
513            try {
514                for (Destination dest : destinations.values()) {
515                    dest.gc();
516                }
517            } finally {
518                destinationsLock.readLock().unlock();
519            }
520        }
521    
522        protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
523    
524        protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination)
525                throws Exception {
526            return destinationFactory.createDestination(context, destination, destinationStatistics);
527        }
528    
529        public boolean isAutoCreateDestinations() {
530            return autoCreateDestinations;
531        }
532    
533        public void setAutoCreateDestinations(boolean autoCreateDestinations) {
534            this.autoCreateDestinations = autoCreateDestinations;
535        }
536    
537        @SuppressWarnings("unchecked")
538        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
539            destinationsLock.readLock().lock();
540            try {
541                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
542                    dest.addProducer(context, info);
543                }
544            } finally {
545                destinationsLock.readLock().unlock();
546            }
547        }
548    
549        /**
550         * Removes a Producer.
551         *
552         * @param context
553         *            the environment the operation is being executed under.
554         * @throws Exception
555         *             TODO
556         */
557        @SuppressWarnings("unchecked")
558        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
559            destinationsLock.readLock().lock();
560            try {
561                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
562                    dest.removeProducer(context, info);
563                }
564            } finally {
565                destinationsLock.readLock().unlock();
566            }
567        }
568    
569        protected void dispose(ConnectionContext context, Destination dest) throws Exception {
570            dest.dispose(context);
571            dest.stop();
572            destinationFactory.removeDestination(dest);
573        }
574    
575        public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
576            Subscription sub = subscriptions.get(control.getConsumerId());
577            if (sub != null && sub instanceof AbstractSubscription) {
578                ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
579                if (broker.getDestinationPolicy() != null) {
580                    PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(control.getDestination());
581                    if (entry != null) {
582                        entry.configurePrefetch(sub);
583                    }
584                }
585                if (LOG.isDebugEnabled()) {
586                    LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: "
587                            + control.getConsumerId() + "; resulting value: " + sub.getConsumerInfo().getCurrentPrefetchSize());
588                }
589                try {
590                    lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
591                } catch (Exception e) {
592                    LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: " + control.getDestination(), e);
593                }
594            }
595        }
596    }