001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import org.apache.activemq.broker.Broker;
020    import org.apache.activemq.broker.BrokerService;
021    import org.apache.activemq.broker.Connection;
022    import org.apache.activemq.broker.ConnectionContext;
023    import org.apache.activemq.broker.ConsumerBrokerExchange;
024    import org.apache.activemq.broker.EmptyBroker;
025    import org.apache.activemq.broker.ProducerBrokerExchange;
026    import org.apache.activemq.broker.TransportConnector;
027    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
028    import org.apache.activemq.broker.region.policy.PolicyMap;
029    import org.apache.activemq.command.*;
030    import org.apache.activemq.state.ConnectionState;
031    import org.apache.activemq.store.PListStore;
032    import org.apache.activemq.thread.Scheduler;
033    import org.apache.activemq.thread.TaskRunnerFactory;
034    import org.apache.activemq.usage.SystemUsage;
035    import org.apache.activemq.util.BrokerSupport;
036    import org.apache.activemq.util.IdGenerator;
037    import org.apache.activemq.util.InetAddressUtil;
038    import org.apache.activemq.util.LongSequenceGenerator;
039    import org.apache.activemq.util.ServiceStopper;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    import javax.jms.InvalidClientIDException;
044    import javax.jms.JMSException;
045    import java.io.IOException;
046    import java.net.URI;
047    import java.util.ArrayList;
048    import java.util.Collections;
049    import java.util.HashMap;
050    import java.util.List;
051    import java.util.Locale;
052    import java.util.Map;
053    import java.util.Set;
054    import java.util.concurrent.ConcurrentHashMap;
055    import java.util.concurrent.CopyOnWriteArrayList;
056    import java.util.concurrent.ThreadPoolExecutor;
057    import java.util.concurrent.locks.ReentrantReadWriteLock;
058    
059    /**
060     * Routes Broker operations to the correct messaging regions for processing.
061     *
062     *
063     */
064    public class RegionBroker extends EmptyBroker {
065        public static final String ORIGINAL_EXPIRATION = "originalExpiration";
066        private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
067        private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
068    
069        protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
070        protected DestinationFactory destinationFactory;
071        protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
072    
073        private final Region queueRegion;
074        private final Region topicRegion;
075        private final Region tempQueueRegion;
076        private final Region tempTopicRegion;
077        protected final BrokerService brokerService;
078        private boolean started;
079        private boolean keepDurableSubsActive;
080    
081        private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
082        private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
083        private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
084    
085        private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
086        private BrokerId brokerId;
087        private String brokerName;
088        private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
089        private final DestinationInterceptor destinationInterceptor;
090        private ConnectionContext adminConnectionContext;
091        private final Scheduler scheduler;
092        private final ThreadPoolExecutor executor;
093        private boolean allowTempAutoCreationOnSend;
094    
095        private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
096        private final Runnable purgeInactiveDestinationsTask = new Runnable() {
097            public void run() {
098                purgeInactiveDestinations();
099            }
100        };
101    
102        public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
103                            DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
104            this.brokerService = brokerService;
105            this.executor=executor;
106            this.scheduler = scheduler;
107            if (destinationFactory == null) {
108                throw new IllegalArgumentException("null destinationFactory");
109            }
110            this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
111            this.destinationFactory = destinationFactory;
112            queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
113            topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
114            this.destinationInterceptor = destinationInterceptor;
115            tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
116            tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
117        }
118    
119        @Override
120        public Map<ActiveMQDestination, Destination> getDestinationMap() {
121            Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap());
122            answer.putAll(getTopicRegion().getDestinationMap());
123            return answer;
124        }
125    
126        @Override
127        public Set <Destination> getDestinations(ActiveMQDestination destination) {
128            try {
129                return getRegion(destination).getDestinations(destination);
130            } catch (JMSException jmse) {
131                return Collections.emptySet();
132            }
133        }
134    
135        @Override
136        @SuppressWarnings("rawtypes")
137        public Broker getAdaptor(Class type) {
138            if (type.isInstance(this)) {
139                return this;
140            }
141            return null;
142        }
143    
144        public Region getQueueRegion() {
145            return queueRegion;
146        }
147    
148        public Region getTempQueueRegion() {
149            return tempQueueRegion;
150        }
151    
152        public Region getTempTopicRegion() {
153            return tempTopicRegion;
154        }
155    
156        public Region getTopicRegion() {
157            return topicRegion;
158        }
159    
160        protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
161            return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
162        }
163    
164        protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
165            return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
166        }
167    
168        protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
169            return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
170        }
171    
172        protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
173            return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
174        }
175    
176        @Override
177        public void start() throws Exception {
178            started = true;
179            queueRegion.start();
180            topicRegion.start();
181            tempQueueRegion.start();
182            tempTopicRegion.start();
183            int period = this.brokerService.getSchedulePeriodForDestinationPurge();
184            if (period > 0) {
185                this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
186            }
187        }
188    
189        @Override
190        public void stop() throws Exception {
191            started = false;
192            this.scheduler.cancel(purgeInactiveDestinationsTask);
193            ServiceStopper ss = new ServiceStopper();
194            doStop(ss);
195            ss.throwFirstException();
196            // clear the state
197            clientIdSet.clear();
198            connections.clear();
199            destinations.clear();
200            brokerInfos.clear();
201        }
202    
203        public PolicyMap getDestinationPolicy() {
204            return brokerService != null ? brokerService.getDestinationPolicy() : null;
205        }
206    
207        @Override
208        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
209            String clientId = info.getClientId();
210            if (clientId == null) {
211                throw new InvalidClientIDException("No clientID specified for connection request");
212            }
213            synchronized (clientIdSet) {
214                ConnectionContext oldContext = clientIdSet.get(clientId);
215                if (oldContext != null) {
216                    throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
217                                                       + oldContext.getConnection().getRemoteAddress());
218                } else {
219                    clientIdSet.put(clientId, context);
220                }
221            }
222    
223            connections.add(context.getConnection());
224        }
225    
226        @Override
227        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
228            String clientId = info.getClientId();
229            if (clientId == null) {
230                throw new InvalidClientIDException("No clientID specified for connection disconnect request");
231            }
232            synchronized (clientIdSet) {
233                ConnectionContext oldValue = clientIdSet.get(clientId);
234                // we may be removing the duplicate connection, not the first
235                // connection to be created
236                // so lets check that their connection IDs are the same
237                if (oldValue == context) {
238                    if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
239                        clientIdSet.remove(clientId);
240                    }
241                }
242            }
243            connections.remove(context.getConnection());
244        }
245    
246        protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
247            return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
248        }
249    
250        @Override
251        public Connection[] getClients() throws Exception {
252            ArrayList<Connection> l = new ArrayList<Connection>(connections);
253            Connection rc[] = new Connection[l.size()];
254            l.toArray(rc);
255            return rc;
256        }
257    
258        @Override
259        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception {
260    
261            Destination answer;
262    
263            answer = destinations.get(destination);
264            if (answer != null) {
265                return answer;
266            }
267    
268         synchronized (destinations) {
269            answer = destinations.get(destination);
270            if (answer != null) {
271                return answer;
272            }
273    
274            boolean create = true;
275            if (destination.isTemporary())
276                create = createIfTemp;
277            answer = getRegion(destination).addDestination(context, destination, create);
278    
279            destinations.put(destination, answer);
280            return answer;
281         }
282    
283        }
284    
285        @Override
286        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
287            if (destinations.containsKey(destination)) {
288                getRegion(destination).removeDestination(context, destination, timeout);
289                destinations.remove(destination);
290            }
291        }
292    
293        @Override
294        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
295            addDestination(context, info.getDestination(),true);
296    
297        }
298    
299        @Override
300        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
301            removeDestination(context, info.getDestination(), info.getTimeout());
302    
303        }
304    
305        @Override
306        public ActiveMQDestination[] getDestinations() throws Exception {
307            ArrayList<ActiveMQDestination> l;
308    
309            l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
310    
311            ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
312            l.toArray(rc);
313            return rc;
314        }
315    
316        @Override
317        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
318            ActiveMQDestination destination = info.getDestination();
319            if (destination != null) {
320                inactiveDestinationsPurgeLock.readLock().lock();
321                try {
322                    // This seems to cause the destination to be added but without
323                    // advisories firing...
324                    context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
325                    getRegion(destination).addProducer(context, info);
326                } finally {
327                    inactiveDestinationsPurgeLock.readLock().unlock();
328                }
329            }
330        }
331    
332        @Override
333        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
334            ActiveMQDestination destination = info.getDestination();
335            if (destination != null) {
336                inactiveDestinationsPurgeLock.readLock().lock();
337                try {
338                    getRegion(destination).removeProducer(context, info);
339                } finally {
340                    inactiveDestinationsPurgeLock.readLock().unlock();
341                }
342            }
343        }
344    
345        @Override
346        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
347            ActiveMQDestination destination = info.getDestination();
348            if (destinationInterceptor != null) {
349                destinationInterceptor.create(this, context, destination);
350            }
351            inactiveDestinationsPurgeLock.readLock().lock();
352            try {
353                return getRegion(destination).addConsumer(context, info);
354            } finally {
355                inactiveDestinationsPurgeLock.readLock().unlock();
356            }
357        }
358    
359        @Override
360        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
361            ActiveMQDestination destination = info.getDestination();
362            inactiveDestinationsPurgeLock.readLock().lock();
363            try {
364                getRegion(destination).removeConsumer(context, info);
365            } finally {
366                inactiveDestinationsPurgeLock.readLock().unlock();
367            }
368        }
369    
370        @Override
371        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
372            inactiveDestinationsPurgeLock.readLock().lock();
373            try {
374                topicRegion.removeSubscription(context, info);
375            } finally {
376                inactiveDestinationsPurgeLock.readLock().unlock();
377            }
378        }
379    
380        @Override
381        public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
382            ActiveMQDestination destination = message.getDestination();
383            message.setBrokerInTime(System.currentTimeMillis());
384            if (producerExchange.isMutable() || producerExchange.getRegion() == null
385                    || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
386                // ensure the destination is registered with the RegionBroker
387                producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
388                producerExchange.setRegion(getRegion(destination));
389                producerExchange.setRegionDestination(null);
390            }
391    
392            producerExchange.getRegion().send(producerExchange, message);
393    
394            // clean up so these references aren't kept (possible leak) in the producer exchange
395            // especially since temps are transitory
396            if (producerExchange.isMutable()) {
397                producerExchange.setRegionDestination(null);
398                producerExchange.setRegion(null);
399            }
400        }
401    
402        @Override
403        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
404            if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
405                ActiveMQDestination destination = ack.getDestination();
406                consumerExchange.setRegion(getRegion(destination));
407            }
408            consumerExchange.getRegion().acknowledge(consumerExchange, ack);
409        }
410    
411        protected Region getRegion(ActiveMQDestination destination) throws JMSException {
412            switch (destination.getDestinationType()) {
413            case ActiveMQDestination.QUEUE_TYPE:
414                return queueRegion;
415            case ActiveMQDestination.TOPIC_TYPE:
416                return topicRegion;
417            case ActiveMQDestination.TEMP_QUEUE_TYPE:
418                return tempQueueRegion;
419            case ActiveMQDestination.TEMP_TOPIC_TYPE:
420                return tempTopicRegion;
421            default:
422                throw createUnknownDestinationTypeException(destination);
423            }
424        }
425    
426        @Override
427        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
428            ActiveMQDestination destination = pull.getDestination();
429            return getRegion(destination).messagePull(context, pull);
430        }
431    
432        @Override
433        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
434            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
435        }
436    
437        @Override
438        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
439            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
440        }
441    
442        @Override
443        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
444            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
445        }
446    
447        @Override
448        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
449            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
450        }
451    
452        @Override
453        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
454            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
455        }
456    
457        @Override
458        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
459            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
460        }
461    
462        @Override
463        public void gc() {
464            queueRegion.gc();
465            topicRegion.gc();
466        }
467    
468        @Override
469        public BrokerId getBrokerId() {
470            if (brokerId == null) {
471                brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
472            }
473            return brokerId;
474        }
475    
476        public void setBrokerId(BrokerId brokerId) {
477            this.brokerId = brokerId;
478        }
479    
480        @Override
481        public String getBrokerName() {
482            if (brokerName == null) {
483                try {
484                    brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH);
485                } catch (Exception e) {
486                    brokerName = "localhost";
487                }
488            }
489            return brokerName;
490        }
491    
492        public void setBrokerName(String brokerName) {
493            this.brokerName = brokerName;
494        }
495    
496        public DestinationStatistics getDestinationStatistics() {
497            return destinationStatistics;
498        }
499    
500        protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
501            return new JMSException("Unknown destination type: " + destination.getDestinationType());
502        }
503    
504        @Override
505        public synchronized void addBroker(Connection connection, BrokerInfo info) {
506            BrokerInfo existing = brokerInfos.get(info.getBrokerId());
507            if (existing == null) {
508                existing = info.copy();
509                existing.setPeerBrokerInfos(null);
510                brokerInfos.put(info.getBrokerId(), existing);
511            }
512            existing.incrementRefCount();
513            if (LOG.isDebugEnabled()) {
514                LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
515            }
516            addBrokerInClusterUpdate(info);
517        }
518    
519        @Override
520        public synchronized void removeBroker(Connection connection, BrokerInfo info) {
521            if (info != null) {
522                BrokerInfo existing = brokerInfos.get(info.getBrokerId());
523                if (existing != null && existing.decrementRefCount() == 0) {
524                   brokerInfos.remove(info.getBrokerId());
525                }
526                if (LOG.isDebugEnabled()) {
527                    LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
528                }
529                removeBrokerInClusterUpdate(info);
530            }
531        }
532    
533        @Override
534        public synchronized BrokerInfo[] getPeerBrokerInfos() {
535            BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
536            result = brokerInfos.values().toArray(result);
537            return result;
538        }
539    
540        @Override
541        public void preProcessDispatch(MessageDispatch messageDispatch) {
542            Message message = messageDispatch.getMessage();
543            if (message != null) {
544                long endTime = System.currentTimeMillis();
545                message.setBrokerOutTime(endTime);
546                if (getBrokerService().isEnableStatistics()) {
547                    long totalTime = endTime - message.getBrokerInTime();
548                    ((Destination)message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
549                }
550            }
551        }
552    
553        @Override
554        public void postProcessDispatch(MessageDispatch messageDispatch) {
555        }
556    
557        @Override
558        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
559            ActiveMQDestination destination = messageDispatchNotification.getDestination();
560            getRegion(destination).processDispatchNotification(messageDispatchNotification);
561        }
562    
563        @Override
564        public boolean isStopped() {
565            return !started;
566        }
567    
568        @Override
569        public Set<ActiveMQDestination> getDurableDestinations() {
570            return destinationFactory.getDestinations();
571        }
572    
573        protected void doStop(ServiceStopper ss) {
574            ss.stop(queueRegion);
575            ss.stop(topicRegion);
576            ss.stop(tempQueueRegion);
577            ss.stop(tempTopicRegion);
578        }
579    
580        public boolean isKeepDurableSubsActive() {
581            return keepDurableSubsActive;
582        }
583    
584        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
585            this.keepDurableSubsActive = keepDurableSubsActive;
586            ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
587        }
588    
589        public DestinationInterceptor getDestinationInterceptor() {
590            return destinationInterceptor;
591        }
592    
593        @Override
594        public ConnectionContext getAdminConnectionContext() {
595            return adminConnectionContext;
596        }
597    
598        @Override
599        public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
600            this.adminConnectionContext = adminConnectionContext;
601        }
602    
603        public Map<ConnectionId, ConnectionState> getConnectionStates() {
604            return connectionStates;
605        }
606    
607        @Override
608        public PListStore getTempDataStore() {
609            return brokerService.getTempDataStore();
610        }
611    
612        @Override
613        public URI getVmConnectorURI() {
614            return brokerService.getVmConnectorURI();
615        }
616    
617        @Override
618        public void brokerServiceStarted() {
619        }
620    
621        @Override
622        public BrokerService getBrokerService() {
623            return brokerService;
624        }
625    
626        @Override
627        public boolean isExpired(MessageReference messageReference) {
628            boolean expired = false;
629            if (messageReference.isExpired()) {
630                try {
631                    // prevent duplicate expiry processing
632                    Message message = messageReference.getMessage();
633                    synchronized (message) {
634                        expired = stampAsExpired(message);
635                    }
636                } catch (IOException e) {
637                    LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
638                }
639            }
640            return expired;
641        }
642    
643        private boolean stampAsExpired(Message message) throws IOException {
644            boolean stamped=false;
645            if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
646                long expiration=message.getExpiration();
647                message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
648                stamped = true;
649            }
650            return stamped;
651        }
652    
653    
654        @Override
655        public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
656            if (LOG.isDebugEnabled()) {
657                LOG.debug("Message expired " + node);
658            }
659            getRoot().sendToDeadLetterQueue(context, node, subscription);
660        }
661    
662        @Override
663        public void sendToDeadLetterQueue(ConnectionContext context,
664                MessageReference node, Subscription subscription){
665            try{
666                if(node!=null){
667                    Message message=node.getMessage();
668                    if(message!=null && node.getRegionDestination()!=null){
669                        DeadLetterStrategy deadLetterStrategy=((Destination)node
670                                .getRegionDestination()).getDeadLetterStrategy();
671                        if(deadLetterStrategy!=null){
672                            if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
673                                // message may be inflight to other subscriptions so do not modify
674                                message = message.copy();
675                                stampAsExpired(message);
676                                message.setExpiration(0);
677                                if(!message.isPersistent()){
678                                    message.setPersistent(true);
679                                    message.setProperty("originalDeliveryMode",
680                                            "NON_PERSISTENT");
681                                }
682                                // The original destination and transaction id do
683                                // not get filled when the message is first sent,
684                                // it is only populated if the message is routed to
685                                // another destination like the DLQ
686                                ActiveMQDestination deadLetterDestination=deadLetterStrategy
687                                        .getDeadLetterQueueFor(message, subscription);
688                                if (context.getBroker()==null) {
689                                    context.setBroker(getRoot());
690                                }
691                                BrokerSupport.resendNoCopy(context,message,
692                                        deadLetterDestination);
693                            }
694                        } else {
695                            if (LOG.isDebugEnabled()) {
696                                LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
697                                        + message.getMessageId() + ", destination: " + message.getDestination());
698                            }
699                        }
700                    }
701                }
702            }catch(Exception e){
703                LOG.warn("Caught an exception sending to DLQ: "+node,e);
704            }
705        }
706    
707        @Override
708        public Broker getRoot() {
709            try {
710                return getBrokerService().getBroker();
711            } catch (Exception e) {
712                LOG.error("Trying to get Root Broker " + e);
713                throw new RuntimeException("The broker from the BrokerService should not throw an exception");
714            }
715        }
716    
717        /**
718         * @return the broker sequence id
719         */
720        @Override
721        public long getBrokerSequenceId() {
722            synchronized(sequenceGenerator) {
723                return sequenceGenerator.getNextSequenceId();
724            }
725        }
726    
727    
728        @Override
729        public Scheduler getScheduler() {
730            return this.scheduler;
731        }
732    
733        public ThreadPoolExecutor getExecutor() {
734            return this.executor;
735        }
736    
737        @Override
738        public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
739            ActiveMQDestination destination = control.getDestination();
740            try {
741                getRegion(destination).processConsumerControl(consumerExchange, control);
742            } catch (JMSException jmse) {
743                LOG.warn("unmatched destination: " + destination + ", in consumerControl: "  + control);
744            }
745        }
746    
747        protected void addBrokerInClusterUpdate(BrokerInfo info) {
748            List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
749            for (TransportConnector connector : connectors) {
750                if (connector.isUpdateClusterClients()) {
751                    connector.addPeerBroker(info);
752                    connector.updateClientClusterInfo();
753                }
754            }
755        }
756    
757        protected void removeBrokerInClusterUpdate(BrokerInfo info) {
758            List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
759            for (TransportConnector connector : connectors) {
760                if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
761                    connector.removePeerBroker(info);
762                    connector.updateClientClusterInfo();
763                }
764            }
765        }
766    
767        protected void purgeInactiveDestinations() {
768            inactiveDestinationsPurgeLock.writeLock().lock();
769            try {
770                List<Destination> list = new ArrayList<Destination>();
771                Map<ActiveMQDestination, Destination> map = getDestinationMap();
772                if (isAllowTempAutoCreationOnSend()) {
773                    map.putAll(tempQueueRegion.getDestinationMap());
774                    map.putAll(tempTopicRegion.getDestinationMap());
775                }
776                long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep();
777                long timeStamp = System.currentTimeMillis();
778                for (Destination d : map.values()) {
779                    d.markForGC(timeStamp);
780                    if (d.canGC()) {
781                        list.add(d);
782                        if (maxPurgedDests > 0 && list.size() == maxPurgedDests) {
783                            break;
784                        }
785                    }
786                }
787    
788                if (!list.isEmpty()) {
789                    ConnectionContext context = BrokerSupport.getConnectionContext(this);
790                    context.setBroker(this);
791    
792                    for (Destination dest : list) {
793                        Logger log = LOG;
794                        if (dest instanceof BaseDestination) {
795                            log = ((BaseDestination) dest).getLog();
796                        }
797                        log.info(dest.getName() + " Inactive for longer than " +
798                                 dest.getInactiveTimoutBeforeGC() + " ms - removing ...");
799                        try {
800                            getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
801                        } catch (Exception e) {
802                            LOG.error("Failed to remove inactive destination " + dest, e);
803                        }
804                    }
805                }
806            } finally {
807                inactiveDestinationsPurgeLock.writeLock().unlock();
808            }
809        }
810    
811        public boolean isAllowTempAutoCreationOnSend() {
812            return allowTempAutoCreationOnSend;
813        }
814    
815        public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
816            this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
817        }
818    }