001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import org.apache.activemq.broker.Broker;
020    import org.apache.activemq.broker.BrokerService;
021    import org.apache.activemq.broker.Connection;
022    import org.apache.activemq.broker.ConnectionContext;
023    import org.apache.activemq.broker.ConsumerBrokerExchange;
024    import org.apache.activemq.broker.EmptyBroker;
025    import org.apache.activemq.broker.ProducerBrokerExchange;
026    import org.apache.activemq.broker.TransportConnector;
027    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
028    import org.apache.activemq.broker.region.policy.PolicyMap;
029    import org.apache.activemq.command.*;
030    import org.apache.activemq.state.ConnectionState;
031    import org.apache.activemq.store.kahadb.plist.PListStore;
032    import org.apache.activemq.thread.Scheduler;
033    import org.apache.activemq.thread.TaskRunnerFactory;
034    import org.apache.activemq.usage.SystemUsage;
035    import org.apache.activemq.util.BrokerSupport;
036    import org.apache.activemq.util.IdGenerator;
037    import org.apache.activemq.util.InetAddressUtil;
038    import org.apache.activemq.util.LongSequenceGenerator;
039    import org.apache.activemq.util.ServiceStopper;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    import javax.jms.InvalidClientIDException;
044    import javax.jms.JMSException;
045    import java.io.IOException;
046    import java.net.URI;
047    import java.util.ArrayList;
048    import java.util.Collections;
049    import java.util.HashMap;
050    import java.util.List;
051    import java.util.Locale;
052    import java.util.Map;
053    import java.util.Set;
054    import java.util.concurrent.ConcurrentHashMap;
055    import java.util.concurrent.CopyOnWriteArrayList;
056    import java.util.concurrent.ThreadPoolExecutor;
057    import java.util.concurrent.locks.ReentrantReadWriteLock;
058    
059    /**
060     * Routes Broker operations to the correct messaging regions for processing.
061     *
062     *
063     */
064    public class RegionBroker extends EmptyBroker {
065        public static final String ORIGINAL_EXPIRATION = "originalExpiration";
066        private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
067        private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
068    
069        protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
070        protected DestinationFactory destinationFactory;
071        protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
072    
073        private final Region queueRegion;
074        private final Region topicRegion;
075        private final Region tempQueueRegion;
076        private final Region tempTopicRegion;
077        protected final BrokerService brokerService;
078        private boolean started;
079        private boolean keepDurableSubsActive;
080    
081        private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
082        private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
083        private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
084    
085        private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
086        private BrokerId brokerId;
087        private String brokerName;
088        private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
089        private final DestinationInterceptor destinationInterceptor;
090        private ConnectionContext adminConnectionContext;
091        private final Scheduler scheduler;
092        private final ThreadPoolExecutor executor;
093        private boolean allowTempAutoCreationOnSend;
094    
095        private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
096        private final Runnable purgeInactiveDestinationsTask = new Runnable() {
097            public void run() {
098                purgeInactiveDestinations();
099            }
100        };
101    
102        public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
103                            DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
104            this.brokerService = brokerService;
105            this.executor=executor;
106            this.scheduler = scheduler;
107            if (destinationFactory == null) {
108                throw new IllegalArgumentException("null destinationFactory");
109            }
110            this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
111            this.destinationFactory = destinationFactory;
112            queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
113            topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
114            this.destinationInterceptor = destinationInterceptor;
115            tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
116            tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
117        }
118    
119        @Override
120        public Map<ActiveMQDestination, Destination> getDestinationMap() {
121            Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap());
122            answer.putAll(getTopicRegion().getDestinationMap());
123            return answer;
124        }
125    
126        @Override
127        public Set <Destination> getDestinations(ActiveMQDestination destination) {
128            switch (destination.getDestinationType()) {
129            case ActiveMQDestination.QUEUE_TYPE:
130                return queueRegion.getDestinations(destination);
131            case ActiveMQDestination.TOPIC_TYPE:
132                return topicRegion.getDestinations(destination);
133            case ActiveMQDestination.TEMP_QUEUE_TYPE:
134                return tempQueueRegion.getDestinations(destination);
135            case ActiveMQDestination.TEMP_TOPIC_TYPE:
136                return tempTopicRegion.getDestinations(destination);
137            default:
138                return Collections.emptySet();
139            }
140        }
141    
142        @Override
143        @SuppressWarnings("rawtypes")
144        public Broker getAdaptor(Class type) {
145            if (type.isInstance(this)) {
146                return this;
147            }
148            return null;
149        }
150    
151        public Region getQueueRegion() {
152            return queueRegion;
153        }
154    
155        public Region getTempQueueRegion() {
156            return tempQueueRegion;
157        }
158    
159        public Region getTempTopicRegion() {
160            return tempTopicRegion;
161        }
162    
163        public Region getTopicRegion() {
164            return topicRegion;
165        }
166    
167        protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
168            return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
169        }
170    
171        protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
172            return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
173        }
174    
175        protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
176            return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
177        }
178    
179        protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
180            return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
181        }
182    
183        @Override
184        public void start() throws Exception {
185            started = true;
186            queueRegion.start();
187            topicRegion.start();
188            tempQueueRegion.start();
189            tempTopicRegion.start();
190            int period = this.brokerService.getSchedulePeriodForDestinationPurge();
191            if (period > 0) {
192                this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
193            }
194        }
195    
196        @Override
197        public void stop() throws Exception {
198            started = false;
199            this.scheduler.cancel(purgeInactiveDestinationsTask);
200            ServiceStopper ss = new ServiceStopper();
201            doStop(ss);
202            ss.throwFirstException();
203            // clear the state
204            clientIdSet.clear();
205            connections.clear();
206            destinations.clear();
207            brokerInfos.clear();
208        }
209    
210        public PolicyMap getDestinationPolicy() {
211            return brokerService != null ? brokerService.getDestinationPolicy() : null;
212        }
213    
214        @Override
215        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
216            String clientId = info.getClientId();
217            if (clientId == null) {
218                throw new InvalidClientIDException("No clientID specified for connection request");
219            }
220            synchronized (clientIdSet) {
221                ConnectionContext oldContext = clientIdSet.get(clientId);
222                if (oldContext != null) {
223                    throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
224                                                       + oldContext.getConnection().getRemoteAddress());
225                } else {
226                    clientIdSet.put(clientId, context);
227                }
228            }
229    
230            connections.add(context.getConnection());
231        }
232    
233        @Override
234        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
235            String clientId = info.getClientId();
236            if (clientId == null) {
237                throw new InvalidClientIDException("No clientID specified for connection disconnect request");
238            }
239            synchronized (clientIdSet) {
240                ConnectionContext oldValue = clientIdSet.get(clientId);
241                // we may be removing the duplicate connection, not the first
242                // connection to be created
243                // so lets check that their connection IDs are the same
244                if (oldValue == context) {
245                    if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
246                        clientIdSet.remove(clientId);
247                    }
248                }
249            }
250            connections.remove(context.getConnection());
251        }
252    
253        protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
254            return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
255        }
256    
257        @Override
258        public Connection[] getClients() throws Exception {
259            ArrayList<Connection> l = new ArrayList<Connection>(connections);
260            Connection rc[] = new Connection[l.size()];
261            l.toArray(rc);
262            return rc;
263        }
264    
265        @Override
266        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception {
267    
268            Destination answer;
269    
270            answer = destinations.get(destination);
271            if (answer != null) {
272                return answer;
273            }
274    
275         synchronized (destinations) {
276            answer = destinations.get(destination);
277            if (answer != null) {
278                return answer;
279            }
280    
281            switch (destination.getDestinationType()) {
282            case ActiveMQDestination.QUEUE_TYPE:
283                answer = queueRegion.addDestination(context, destination,true);
284                break;
285            case ActiveMQDestination.TOPIC_TYPE:
286                answer = topicRegion.addDestination(context, destination,true);
287                break;
288            case ActiveMQDestination.TEMP_QUEUE_TYPE:
289                answer = tempQueueRegion.addDestination(context, destination, createIfTemp);
290                break;
291            case ActiveMQDestination.TEMP_TOPIC_TYPE:
292                answer = tempTopicRegion.addDestination(context, destination, createIfTemp);
293                break;
294            default:
295                throw createUnknownDestinationTypeException(destination);
296            }
297    
298            destinations.put(destination, answer);
299            return answer;
300         }
301    
302        }
303    
304        @Override
305        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
306    
307            if (destinations.containsKey(destination)) {
308                switch (destination.getDestinationType()) {
309                case ActiveMQDestination.QUEUE_TYPE:
310                    queueRegion.removeDestination(context, destination, timeout);
311                    break;
312                case ActiveMQDestination.TOPIC_TYPE:
313                    topicRegion.removeDestination(context, destination, timeout);
314                    break;
315                case ActiveMQDestination.TEMP_QUEUE_TYPE:
316                    tempQueueRegion.removeDestination(context, destination, timeout);
317                    break;
318                case ActiveMQDestination.TEMP_TOPIC_TYPE:
319                    tempTopicRegion.removeDestination(context, destination, timeout);
320                    break;
321                default:
322                    throw createUnknownDestinationTypeException(destination);
323                }
324                destinations.remove(destination);
325    
326            }
327    
328        }
329    
330        @Override
331        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
332            addDestination(context, info.getDestination(),true);
333    
334        }
335    
336        @Override
337        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
338            removeDestination(context, info.getDestination(), info.getTimeout());
339    
340        }
341    
342        @Override
343        public ActiveMQDestination[] getDestinations() throws Exception {
344            ArrayList<ActiveMQDestination> l;
345    
346            l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
347    
348            ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
349            l.toArray(rc);
350            return rc;
351        }
352    
353        @Override
354        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
355            ActiveMQDestination destination = info.getDestination();
356            if (destination != null) {
357                inactiveDestinationsPurgeLock.readLock().lock();
358                try {
359                    // This seems to cause the destination to be added but without
360                    // advisories firing...
361                    context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
362                    switch (destination.getDestinationType()) {
363                    case ActiveMQDestination.QUEUE_TYPE:
364                        queueRegion.addProducer(context, info);
365                        break;
366                    case ActiveMQDestination.TOPIC_TYPE:
367                        topicRegion.addProducer(context, info);
368                        break;
369                    case ActiveMQDestination.TEMP_QUEUE_TYPE:
370                        tempQueueRegion.addProducer(context, info);
371                        break;
372                    case ActiveMQDestination.TEMP_TOPIC_TYPE:
373                        tempTopicRegion.addProducer(context, info);
374                        break;
375                    }
376                } finally {
377                    inactiveDestinationsPurgeLock.readLock().unlock();
378                }
379            }
380        }
381    
382        @Override
383        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
384            ActiveMQDestination destination = info.getDestination();
385            if (destination != null) {
386                inactiveDestinationsPurgeLock.readLock().lock();
387                try {
388                    switch (destination.getDestinationType()) {
389                    case ActiveMQDestination.QUEUE_TYPE:
390                        queueRegion.removeProducer(context, info);
391                        break;
392                    case ActiveMQDestination.TOPIC_TYPE:
393                        topicRegion.removeProducer(context, info);
394                        break;
395                    case ActiveMQDestination.TEMP_QUEUE_TYPE:
396                        tempQueueRegion.removeProducer(context, info);
397                        break;
398                    case ActiveMQDestination.TEMP_TOPIC_TYPE:
399                        tempTopicRegion.removeProducer(context, info);
400                        break;
401                    }
402                } finally {
403                    inactiveDestinationsPurgeLock.readLock().unlock();
404                }
405            }
406        }
407    
408        @Override
409        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
410            ActiveMQDestination destination = info.getDestination();
411            if (destinationInterceptor != null) {
412                destinationInterceptor.create(this, context, destination);
413            }
414            inactiveDestinationsPurgeLock.readLock().lock();
415            try {
416                switch (destination.getDestinationType()) {
417                case ActiveMQDestination.QUEUE_TYPE:
418                    return queueRegion.addConsumer(context, info);
419    
420                case ActiveMQDestination.TOPIC_TYPE:
421                    return topicRegion.addConsumer(context, info);
422    
423                case ActiveMQDestination.TEMP_QUEUE_TYPE:
424                    return tempQueueRegion.addConsumer(context, info);
425    
426                case ActiveMQDestination.TEMP_TOPIC_TYPE:
427                    return tempTopicRegion.addConsumer(context, info);
428    
429                default:
430                    throw createUnknownDestinationTypeException(destination);
431                }
432            } finally {
433                inactiveDestinationsPurgeLock.readLock().unlock();
434            }
435        }
436    
437        @Override
438        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
439            ActiveMQDestination destination = info.getDestination();
440            inactiveDestinationsPurgeLock.readLock().lock();
441            try {
442                switch (destination.getDestinationType()) {
443    
444                case ActiveMQDestination.QUEUE_TYPE:
445                    queueRegion.removeConsumer(context, info);
446                    break;
447                case ActiveMQDestination.TOPIC_TYPE:
448                    topicRegion.removeConsumer(context, info);
449                    break;
450                case ActiveMQDestination.TEMP_QUEUE_TYPE:
451                    tempQueueRegion.removeConsumer(context, info);
452                    break;
453                case ActiveMQDestination.TEMP_TOPIC_TYPE:
454                    tempTopicRegion.removeConsumer(context, info);
455                    break;
456                default:
457                    throw createUnknownDestinationTypeException(destination);
458                }
459            } finally {
460                inactiveDestinationsPurgeLock.readLock().unlock();
461            }
462        }
463    
464        @Override
465        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
466            inactiveDestinationsPurgeLock.readLock().lock();
467            try {
468                topicRegion.removeSubscription(context, info);
469            } finally {
470                inactiveDestinationsPurgeLock.readLock().unlock();
471            }
472        }
473    
474        @Override
475        public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
476            message.setBrokerInTime(System.currentTimeMillis());
477            if (producerExchange.isMutable() || producerExchange.getRegion() == null
478                    || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
479                ActiveMQDestination destination = message.getDestination();
480                // ensure the destination is registered with the RegionBroker
481                producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
482                Region region;
483                switch (destination.getDestinationType()) {
484                case ActiveMQDestination.QUEUE_TYPE:
485                    region = queueRegion;
486                    break;
487                case ActiveMQDestination.TOPIC_TYPE:
488                    region = topicRegion;
489                    break;
490                case ActiveMQDestination.TEMP_QUEUE_TYPE:
491                    region = tempQueueRegion;
492                    break;
493                case ActiveMQDestination.TEMP_TOPIC_TYPE:
494                    region = tempTopicRegion;
495                    break;
496                default:
497                    throw createUnknownDestinationTypeException(destination);
498                }
499                producerExchange.setRegion(region);
500                producerExchange.setRegionDestination(null);
501            }
502    
503            producerExchange.getRegion().send(producerExchange, message);
504        }
505    
506        @Override
507        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
508            if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
509                ActiveMQDestination destination = ack.getDestination();
510                Region region;
511                switch (destination.getDestinationType()) {
512                case ActiveMQDestination.QUEUE_TYPE:
513                    region = queueRegion;
514                    break;
515                case ActiveMQDestination.TOPIC_TYPE:
516                    region = topicRegion;
517                    break;
518                case ActiveMQDestination.TEMP_QUEUE_TYPE:
519                    region = tempQueueRegion;
520                    break;
521                case ActiveMQDestination.TEMP_TOPIC_TYPE:
522                    region = tempTopicRegion;
523                    break;
524                default:
525                    throw createUnknownDestinationTypeException(destination);
526                }
527                consumerExchange.setRegion(region);
528            }
529            consumerExchange.getRegion().acknowledge(consumerExchange, ack);
530        }
531    
532        @Override
533        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
534            ActiveMQDestination destination = pull.getDestination();
535            switch (destination.getDestinationType()) {
536            case ActiveMQDestination.QUEUE_TYPE:
537                return queueRegion.messagePull(context, pull);
538    
539            case ActiveMQDestination.TOPIC_TYPE:
540                return topicRegion.messagePull(context, pull);
541    
542            case ActiveMQDestination.TEMP_QUEUE_TYPE:
543                return tempQueueRegion.messagePull(context, pull);
544    
545            case ActiveMQDestination.TEMP_TOPIC_TYPE:
546                return tempTopicRegion.messagePull(context, pull);
547            default:
548                throw createUnknownDestinationTypeException(destination);
549            }
550        }
551    
552        @Override
553        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
554            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
555        }
556    
557        @Override
558        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
559            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
560        }
561    
562        @Override
563        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
564            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
565        }
566    
567        @Override
568        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
569            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
570        }
571    
572        @Override
573        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
574            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
575        }
576    
577        @Override
578        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
579            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
580        }
581    
582        @Override
583        public void gc() {
584            queueRegion.gc();
585            topicRegion.gc();
586        }
587    
588        @Override
589        public BrokerId getBrokerId() {
590            if (brokerId == null) {
591                brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
592            }
593            return brokerId;
594        }
595    
596        public void setBrokerId(BrokerId brokerId) {
597            this.brokerId = brokerId;
598        }
599    
600        @Override
601        public String getBrokerName() {
602            if (brokerName == null) {
603                try {
604                    brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH);
605                } catch (Exception e) {
606                    brokerName = "localhost";
607                }
608            }
609            return brokerName;
610        }
611    
612        public void setBrokerName(String brokerName) {
613            this.brokerName = brokerName;
614        }
615    
616        public DestinationStatistics getDestinationStatistics() {
617            return destinationStatistics;
618        }
619    
620        protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
621            return new JMSException("Unknown destination type: " + destination.getDestinationType());
622        }
623    
624        @Override
625        public synchronized void addBroker(Connection connection, BrokerInfo info) {
626            BrokerInfo existing = brokerInfos.get(info.getBrokerId());
627            if (existing == null) {
628                existing = info.copy();
629                existing.setPeerBrokerInfos(null);
630                brokerInfos.put(info.getBrokerId(), existing);
631            }
632            existing.incrementRefCount();
633            if (LOG.isDebugEnabled()) {
634                LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
635            }
636            addBrokerInClusterUpdate(info);
637        }
638    
639        @Override
640        public synchronized void removeBroker(Connection connection, BrokerInfo info) {
641            if (info != null) {
642                BrokerInfo existing = brokerInfos.get(info.getBrokerId());
643                if (existing != null && existing.decrementRefCount() == 0) {
644                   brokerInfos.remove(info.getBrokerId());
645                }
646                if (LOG.isDebugEnabled()) {
647                    LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
648                }
649                removeBrokerInClusterUpdate(info);
650            }
651        }
652    
653        @Override
654        public synchronized BrokerInfo[] getPeerBrokerInfos() {
655            BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
656            result = brokerInfos.values().toArray(result);
657            return result;
658        }
659    
660        @Override
661        public void preProcessDispatch(MessageDispatch messageDispatch) {
662            Message message = messageDispatch.getMessage();
663            if (message != null) {
664                long endTime = System.currentTimeMillis();
665                message.setBrokerOutTime(endTime);
666                if (getBrokerService().isEnableStatistics()) {
667                    long totalTime = endTime - message.getBrokerInTime();
668                    message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
669                }
670            }
671        }
672    
673        @Override
674        public void postProcessDispatch(MessageDispatch messageDispatch) {
675        }
676    
677        @Override
678        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
679            ActiveMQDestination destination = messageDispatchNotification.getDestination();
680            switch (destination.getDestinationType()) {
681            case ActiveMQDestination.QUEUE_TYPE:
682                queueRegion.processDispatchNotification(messageDispatchNotification);
683                break;
684            case ActiveMQDestination.TOPIC_TYPE:
685                topicRegion.processDispatchNotification(messageDispatchNotification);
686                break;
687            case ActiveMQDestination.TEMP_QUEUE_TYPE:
688                tempQueueRegion.processDispatchNotification(messageDispatchNotification);
689                break;
690            case ActiveMQDestination.TEMP_TOPIC_TYPE:
691                tempTopicRegion.processDispatchNotification(messageDispatchNotification);
692                break;
693            default:
694                throw createUnknownDestinationTypeException(destination);
695            }
696        }
697    
698        public boolean isSlaveBroker() {
699            return brokerService.isSlave();
700        }
701    
702        @Override
703        public boolean isStopped() {
704            return !started;
705        }
706    
707        @Override
708        public Set<ActiveMQDestination> getDurableDestinations() {
709            return destinationFactory.getDestinations();
710        }
711    
712        protected void doStop(ServiceStopper ss) {
713            ss.stop(queueRegion);
714            ss.stop(topicRegion);
715            ss.stop(tempQueueRegion);
716            ss.stop(tempTopicRegion);
717        }
718    
719        public boolean isKeepDurableSubsActive() {
720            return keepDurableSubsActive;
721        }
722    
723        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
724            this.keepDurableSubsActive = keepDurableSubsActive;
725            ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
726        }
727    
728        public DestinationInterceptor getDestinationInterceptor() {
729            return destinationInterceptor;
730        }
731    
732        @Override
733        public ConnectionContext getAdminConnectionContext() {
734            return adminConnectionContext;
735        }
736    
737        @Override
738        public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
739            this.adminConnectionContext = adminConnectionContext;
740        }
741    
742        public Map<ConnectionId, ConnectionState> getConnectionStates() {
743            return connectionStates;
744        }
745    
746        @Override
747        public PListStore getTempDataStore() {
748            return brokerService.getTempDataStore();
749        }
750    
751        @Override
752        public URI getVmConnectorURI() {
753            return brokerService.getVmConnectorURI();
754        }
755    
756        @Override
757        public void brokerServiceStarted() {
758        }
759    
760        @Override
761        public BrokerService getBrokerService() {
762            return brokerService;
763        }
764    
765        @Override
766        public boolean isExpired(MessageReference messageReference) {
767            boolean expired = false;
768            if (messageReference.isExpired()) {
769                try {
770                    // prevent duplicate expiry processing
771                    Message message = messageReference.getMessage();
772                    synchronized (message) {
773                        expired = stampAsExpired(message);
774                    }
775                } catch (IOException e) {
776                    LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
777                }
778            }
779            return expired;
780        }
781    
782        private boolean stampAsExpired(Message message) throws IOException {
783            boolean stamped=false;
784            if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
785                long expiration=message.getExpiration();
786                message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
787                stamped = true;
788            }
789            return stamped;
790        }
791    
792    
793        @Override
794        public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
795            if (LOG.isDebugEnabled()) {
796                LOG.debug("Message expired " + node);
797            }
798            getRoot().sendToDeadLetterQueue(context, node, subscription);
799        }
800    
801        @Override
802        public void sendToDeadLetterQueue(ConnectionContext context,
803                MessageReference node, Subscription subscription){
804            try{
805                if(node!=null){
806                    Message message=node.getMessage();
807                    if(message!=null && node.getRegionDestination()!=null){
808                        DeadLetterStrategy deadLetterStrategy=node
809                                .getRegionDestination().getDeadLetterStrategy();
810                        if(deadLetterStrategy!=null){
811                            if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
812                                // message may be inflight to other subscriptions so do not modify
813                                message = message.copy();
814                                stampAsExpired(message);
815                                message.setExpiration(0);
816                                if(!message.isPersistent()){
817                                    message.setPersistent(true);
818                                    message.setProperty("originalDeliveryMode",
819                                            "NON_PERSISTENT");
820                                }
821                                // The original destination and transaction id do
822                                // not get filled when the message is first sent,
823                                // it is only populated if the message is routed to
824                                // another destination like the DLQ
825                                ActiveMQDestination deadLetterDestination=deadLetterStrategy
826                                        .getDeadLetterQueueFor(message, subscription);
827                                if (context.getBroker()==null) {
828                                    context.setBroker(getRoot());
829                                }
830                                BrokerSupport.resendNoCopy(context,message,
831                                        deadLetterDestination);
832                            }
833                        } else {
834                            if (LOG.isDebugEnabled()) {
835                                LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
836                                        + message.getMessageId() + ", destination: " + message.getDestination());
837                            }
838                        }
839                    }
840                }
841            }catch(Exception e){
842                LOG.warn("Caught an exception sending to DLQ: "+node,e);
843            }
844        }
845    
846        @Override
847        public Broker getRoot() {
848            try {
849                return getBrokerService().getBroker();
850            } catch (Exception e) {
851                LOG.error("Trying to get Root Broker " + e);
852                throw new RuntimeException("The broker from the BrokerService should not throw an exception");
853            }
854        }
855    
856        /**
857         * @return the broker sequence id
858         */
859        @Override
860        public long getBrokerSequenceId() {
861            synchronized(sequenceGenerator) {
862                return sequenceGenerator.getNextSequenceId();
863            }
864        }
865    
866    
867        @Override
868        public Scheduler getScheduler() {
869            return this.scheduler;
870        }
871    
872        public ThreadPoolExecutor getExecutor() {
873            return this.executor;
874        }
875    
876        @Override
877        public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
878            ActiveMQDestination destination = control.getDestination();
879            switch (destination.getDestinationType()) {
880            case ActiveMQDestination.QUEUE_TYPE:
881                queueRegion.processConsumerControl(consumerExchange, control);
882                break;
883    
884            case ActiveMQDestination.TOPIC_TYPE:
885                topicRegion.processConsumerControl(consumerExchange, control);
886                break;
887    
888            case ActiveMQDestination.TEMP_QUEUE_TYPE:
889                tempQueueRegion.processConsumerControl(consumerExchange, control);
890                break;
891    
892            case ActiveMQDestination.TEMP_TOPIC_TYPE:
893                tempTopicRegion.processConsumerControl(consumerExchange, control);
894                break;
895    
896            default:
897                LOG.warn("unmatched destination: " + destination + ", in consumerControl: "  + control);
898            }
899        }
900    
901        protected void addBrokerInClusterUpdate(BrokerInfo info) {
902            List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
903            for (TransportConnector connector : connectors) {
904                if (connector.isUpdateClusterClients()) {
905                    connector.addPeerBroker(info);
906                    connector.updateClientClusterInfo();
907                }
908            }
909        }
910    
911        protected void removeBrokerInClusterUpdate(BrokerInfo info) {
912            List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
913            for (TransportConnector connector : connectors) {
914                if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
915                    connector.removePeerBroker(info);
916                    connector.updateClientClusterInfo();
917                }
918            }
919        }
920    
921        protected void purgeInactiveDestinations() {
922            inactiveDestinationsPurgeLock.writeLock().lock();
923            try {
924                List<Destination> list = new ArrayList<Destination>();
925                Map<ActiveMQDestination, Destination> map = getDestinationMap();
926                if (isAllowTempAutoCreationOnSend()) {
927                    map.putAll(tempQueueRegion.getDestinationMap());
928                    map.putAll(tempTopicRegion.getDestinationMap());
929                }
930                long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep();
931                long timeStamp = System.currentTimeMillis();
932                for (Destination d : map.values()) {
933                    d.markForGC(timeStamp);
934                    if (d.canGC()) {
935                        list.add(d);
936                        if (maxPurgedDests > 0 && list.size() == maxPurgedDests) {
937                            break;
938                        }
939                    }
940                }
941    
942                if (!list.isEmpty()) {
943                    ConnectionContext context = BrokerSupport.getConnectionContext(this);
944                    context.setBroker(this);
945    
946                    for (Destination dest : list) {
947                        Logger log = LOG;
948                        if (dest instanceof BaseDestination) {
949                            log = ((BaseDestination) dest).getLog();
950                        }
951                        log.info(dest.getName() + " Inactive for longer than " +
952                                 dest.getInactiveTimoutBeforeGC() + " ms - removing ...");
953                        try {
954                            getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
955                        } catch (Exception e) {
956                            LOG.error("Failed to remove inactive destination " + dest, e);
957                        }
958                    }
959                }
960            } finally {
961                inactiveDestinationsPurgeLock.writeLock().unlock();
962            }
963        }
964    
965        public boolean isAllowTempAutoCreationOnSend() {
966            return allowTempAutoCreationOnSend;
967        }
968    
969        public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
970            this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
971        }
972    }