001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.net.URI;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Locale;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.CopyOnWriteArrayList;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.locks.ReentrantReadWriteLock;
032
033import javax.jms.InvalidClientIDException;
034import javax.jms.JMSException;
035
036import org.apache.activemq.broker.Broker;
037import org.apache.activemq.broker.BrokerService;
038import org.apache.activemq.broker.Connection;
039import org.apache.activemq.broker.ConnectionContext;
040import org.apache.activemq.broker.ConsumerBrokerExchange;
041import org.apache.activemq.broker.EmptyBroker;
042import org.apache.activemq.broker.ProducerBrokerExchange;
043import org.apache.activemq.broker.TransportConnection;
044import org.apache.activemq.broker.TransportConnector;
045import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
046import org.apache.activemq.broker.region.policy.PolicyMap;
047import org.apache.activemq.command.ActiveMQDestination;
048import org.apache.activemq.command.ActiveMQMessage;
049import org.apache.activemq.command.BrokerId;
050import org.apache.activemq.command.BrokerInfo;
051import org.apache.activemq.command.ConnectionId;
052import org.apache.activemq.command.ConnectionInfo;
053import org.apache.activemq.command.ConsumerControl;
054import org.apache.activemq.command.ConsumerInfo;
055import org.apache.activemq.command.DestinationInfo;
056import org.apache.activemq.command.Message;
057import org.apache.activemq.command.MessageAck;
058import org.apache.activemq.command.MessageDispatch;
059import org.apache.activemq.command.MessageDispatchNotification;
060import org.apache.activemq.command.MessagePull;
061import org.apache.activemq.command.ProducerInfo;
062import org.apache.activemq.command.RemoveSubscriptionInfo;
063import org.apache.activemq.command.Response;
064import org.apache.activemq.command.TransactionId;
065import org.apache.activemq.state.ConnectionState;
066import org.apache.activemq.store.PListStore;
067import org.apache.activemq.thread.Scheduler;
068import org.apache.activemq.thread.TaskRunnerFactory;
069import org.apache.activemq.transport.TransmitCallback;
070import org.apache.activemq.usage.SystemUsage;
071import org.apache.activemq.util.BrokerSupport;
072import org.apache.activemq.util.IdGenerator;
073import org.apache.activemq.util.InetAddressUtil;
074import org.apache.activemq.util.LongSequenceGenerator;
075import org.apache.activemq.util.ServiceStopper;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079/**
080 * Routes Broker operations to the correct messaging regions for processing.
081 */
082public class RegionBroker extends EmptyBroker {
083    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
084    private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
085    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
086
087    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
088    protected DestinationFactory destinationFactory;
089    protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
090
091    private final Region queueRegion;
092    private final Region topicRegion;
093    private final Region tempQueueRegion;
094    private final Region tempTopicRegion;
095    protected final BrokerService brokerService;
096    private boolean started;
097    private boolean keepDurableSubsActive;
098
099    private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
100    private final Map<ActiveMQDestination, ActiveMQDestination> destinationGate = new HashMap<ActiveMQDestination, ActiveMQDestination>();
101    private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
102    private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
103
104    private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
105    private BrokerId brokerId;
106    private String brokerName;
107    private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
108    private final DestinationInterceptor destinationInterceptor;
109    private ConnectionContext adminConnectionContext;
110    private final Scheduler scheduler;
111    private final ThreadPoolExecutor executor;
112    private boolean allowTempAutoCreationOnSend;
113
114    private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
115    private final Runnable purgeInactiveDestinationsTask = new Runnable() {
116        @Override
117        public void run() {
118            purgeInactiveDestinations();
119        }
120    };
121
122    public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
123        DestinationInterceptor destinationInterceptor, Scheduler scheduler, ThreadPoolExecutor executor) throws IOException {
124        this.brokerService = brokerService;
125        this.executor = executor;
126        this.scheduler = scheduler;
127        if (destinationFactory == null) {
128            throw new IllegalArgumentException("null destinationFactory");
129        }
130        this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
131        this.destinationFactory = destinationFactory;
132        queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
133        topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
134        this.destinationInterceptor = destinationInterceptor;
135        tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
136        tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
137    }
138
139    @Override
140    public Map<ActiveMQDestination, Destination> getDestinationMap() {
141        Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap());
142        answer.putAll(getTopicRegion().getDestinationMap());
143        return answer;
144    }
145
146    @Override
147    public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination) {
148        try {
149            return getRegion(destination).getDestinationMap();
150        } catch (JMSException jmse) {
151            return Collections.emptyMap();
152        }
153    }
154
155    @Override
156    public Set<Destination> getDestinations(ActiveMQDestination destination) {
157        try {
158            return getRegion(destination).getDestinations(destination);
159        } catch (JMSException jmse) {
160            return Collections.emptySet();
161        }
162    }
163
164    @Override
165    @SuppressWarnings("rawtypes")
166    public Broker getAdaptor(Class type) {
167        if (type.isInstance(this)) {
168            return this;
169        }
170        return null;
171    }
172
173    public Region getQueueRegion() {
174        return queueRegion;
175    }
176
177    public Region getTempQueueRegion() {
178        return tempQueueRegion;
179    }
180
181    public Region getTempTopicRegion() {
182        return tempTopicRegion;
183    }
184
185    public Region getTopicRegion() {
186        return topicRegion;
187    }
188
189    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
190        return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
191    }
192
193    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
194        return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
195    }
196
197    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
198        return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
199    }
200
201    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
202        return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
203    }
204
205    @Override
206    public void start() throws Exception {
207        started = true;
208        queueRegion.start();
209        topicRegion.start();
210        tempQueueRegion.start();
211        tempTopicRegion.start();
212        int period = this.brokerService.getSchedulePeriodForDestinationPurge();
213        if (period > 0) {
214            this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
215        }
216    }
217
218    @Override
219    public void stop() throws Exception {
220        started = false;
221        this.scheduler.cancel(purgeInactiveDestinationsTask);
222        ServiceStopper ss = new ServiceStopper();
223        doStop(ss);
224        ss.throwFirstException();
225        // clear the state
226        clientIdSet.clear();
227        connections.clear();
228        destinations.clear();
229        brokerInfos.clear();
230    }
231
232    public PolicyMap getDestinationPolicy() {
233        return brokerService != null ? brokerService.getDestinationPolicy() : null;
234    }
235
236    public ConnectionContext getConnectionContext(String clientId) {
237        return clientIdSet.get(clientId);
238    }
239
240    @Override
241    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
242        String clientId = info.getClientId();
243        if (clientId == null) {
244            throw new InvalidClientIDException("No clientID specified for connection request");
245        }
246
247        ConnectionContext oldContext = null;
248
249        synchronized (clientIdSet) {
250            oldContext = clientIdSet.get(clientId);
251            if (oldContext != null) {
252                if (context.isAllowLinkStealing()) {
253                    clientIdSet.put(clientId, context);
254                } else {
255                    throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
256                        + oldContext.getConnection().getRemoteAddress());
257                }
258            } else {
259                clientIdSet.put(clientId, context);
260            }
261        }
262
263        if (oldContext != null) {
264            if (oldContext.getConnection() != null) {
265                Connection connection = oldContext.getConnection();
266                LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
267                if (connection instanceof TransportConnection) {
268                    TransportConnection transportConnection = (TransportConnection) connection;
269                    transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId()));
270                } else {
271                    connection.stop();
272                }
273            } else {
274                LOG.error("No Connection found for {}", oldContext);
275            }
276        }
277
278        connections.add(context.getConnection());
279    }
280
281    @Override
282    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
283        String clientId = info.getClientId();
284        if (clientId == null) {
285            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
286        }
287        synchronized (clientIdSet) {
288            ConnectionContext oldValue = clientIdSet.get(clientId);
289            // we may be removing the duplicate connection, not the first connection to be created
290            // so lets check that their connection IDs are the same
291            if (oldValue == context) {
292                if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
293                    clientIdSet.remove(clientId);
294                }
295            }
296        }
297        connections.remove(context.getConnection());
298    }
299
300    protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
301        return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
302    }
303
304    @Override
305    public Connection[] getClients() throws Exception {
306        ArrayList<Connection> l = new ArrayList<Connection>(connections);
307        Connection rc[] = new Connection[l.size()];
308        l.toArray(rc);
309        return rc;
310    }
311
312    @Override
313    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception {
314
315        Destination answer;
316
317        answer = destinations.get(destination);
318        if (answer != null) {
319            return answer;
320        }
321
322        synchronized (destinationGate) {
323            answer = destinations.get(destination);
324            if (answer != null) {
325                return answer;
326            }
327
328            if (destinationGate.get(destination) != null) {
329                // Guard against spurious wakeup.
330                while (destinationGate.containsKey(destination)) {
331                    destinationGate.wait();
332                }
333                answer = destinations.get(destination);
334                if (answer != null) {
335                    return answer;
336                } else {
337                    // In case of intermediate remove or add failure
338                    destinationGate.put(destination, destination);
339                }
340            }
341        }
342
343        try {
344            boolean create = true;
345            if (destination.isTemporary()) {
346                create = createIfTemp;
347            }
348            answer = getRegion(destination).addDestination(context, destination, create);
349            destinations.put(destination, answer);
350        } finally {
351            synchronized (destinationGate) {
352                destinationGate.remove(destination);
353                destinationGate.notifyAll();
354            }
355        }
356
357        return answer;
358    }
359
360    @Override
361    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
362        if (destinations.containsKey(destination)) {
363            getRegion(destination).removeDestination(context, destination, timeout);
364            destinations.remove(destination);
365        }
366    }
367
368    @Override
369    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
370        addDestination(context, info.getDestination(), true);
371
372    }
373
374    @Override
375    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
376        removeDestination(context, info.getDestination(), info.getTimeout());
377    }
378
379    @Override
380    public ActiveMQDestination[] getDestinations() throws Exception {
381        ArrayList<ActiveMQDestination> l;
382
383        l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
384
385        ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
386        l.toArray(rc);
387        return rc;
388    }
389
390    @Override
391    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
392        ActiveMQDestination destination = info.getDestination();
393        if (destination != null) {
394            inactiveDestinationsPurgeLock.readLock().lock();
395            try {
396                // This seems to cause the destination to be added but without
397                // advisories firing...
398                context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
399                getRegion(destination).addProducer(context, info);
400            } finally {
401                inactiveDestinationsPurgeLock.readLock().unlock();
402            }
403        }
404    }
405
406    @Override
407    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
408        ActiveMQDestination destination = info.getDestination();
409        if (destination != null) {
410            inactiveDestinationsPurgeLock.readLock().lock();
411            try {
412                getRegion(destination).removeProducer(context, info);
413            } finally {
414                inactiveDestinationsPurgeLock.readLock().unlock();
415            }
416        }
417    }
418
419    @Override
420    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
421        ActiveMQDestination destination = info.getDestination();
422        if (destinationInterceptor != null) {
423            destinationInterceptor.create(this, context, destination);
424        }
425        inactiveDestinationsPurgeLock.readLock().lock();
426        try {
427            return getRegion(destination).addConsumer(context, info);
428        } finally {
429            inactiveDestinationsPurgeLock.readLock().unlock();
430        }
431    }
432
433    @Override
434    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
435        ActiveMQDestination destination = info.getDestination();
436        inactiveDestinationsPurgeLock.readLock().lock();
437        try {
438            getRegion(destination).removeConsumer(context, info);
439        } finally {
440            inactiveDestinationsPurgeLock.readLock().unlock();
441        }
442    }
443
444    @Override
445    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
446        inactiveDestinationsPurgeLock.readLock().lock();
447        try {
448            topicRegion.removeSubscription(context, info);
449        } finally {
450            inactiveDestinationsPurgeLock.readLock().unlock();
451
452        }
453    }
454
455    @Override
456    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
457        ActiveMQDestination destination = message.getDestination();
458        message.setBrokerInTime(System.currentTimeMillis());
459        if (producerExchange.isMutable() || producerExchange.getRegion() == null
460            || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
461            // ensure the destination is registered with the RegionBroker
462            producerExchange.getConnectionContext().getBroker()
463                .addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
464            producerExchange.setRegion(getRegion(destination));
465            producerExchange.setRegionDestination(null);
466        }
467
468        producerExchange.getRegion().send(producerExchange, message);
469
470        // clean up so these references aren't kept (possible leak) in the producer exchange
471        // especially since temps are transitory
472        if (producerExchange.isMutable()) {
473            producerExchange.setRegionDestination(null);
474            producerExchange.setRegion(null);
475        }
476    }
477
478    @Override
479    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
480        if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
481            ActiveMQDestination destination = ack.getDestination();
482            consumerExchange.setRegion(getRegion(destination));
483        }
484        consumerExchange.getRegion().acknowledge(consumerExchange, ack);
485    }
486
487    protected Region getRegion(ActiveMQDestination destination) throws JMSException {
488        switch (destination.getDestinationType()) {
489            case ActiveMQDestination.QUEUE_TYPE:
490                return queueRegion;
491            case ActiveMQDestination.TOPIC_TYPE:
492                return topicRegion;
493            case ActiveMQDestination.TEMP_QUEUE_TYPE:
494                return tempQueueRegion;
495            case ActiveMQDestination.TEMP_TOPIC_TYPE:
496                return tempTopicRegion;
497            default:
498                throw createUnknownDestinationTypeException(destination);
499        }
500    }
501
502    @Override
503    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
504        ActiveMQDestination destination = pull.getDestination();
505        return getRegion(destination).messagePull(context, pull);
506    }
507
508    @Override
509    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
510        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
511    }
512
513    @Override
514    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
515        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
516    }
517
518    @Override
519    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
520        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
521    }
522
523    @Override
524    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
525        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
526    }
527
528    @Override
529    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
530        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
531    }
532
533    @Override
534    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
535        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
536    }
537
538    @Override
539    public void gc() {
540        queueRegion.gc();
541        topicRegion.gc();
542    }
543
544    @Override
545    public BrokerId getBrokerId() {
546        if (brokerId == null) {
547            brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
548        }
549        return brokerId;
550    }
551
552    public void setBrokerId(BrokerId brokerId) {
553        this.brokerId = brokerId;
554    }
555
556    @Override
557    public String getBrokerName() {
558        if (brokerName == null) {
559            try {
560                brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH);
561            } catch (Exception e) {
562                brokerName = "localhost";
563            }
564        }
565        return brokerName;
566    }
567
568    public void setBrokerName(String brokerName) {
569        this.brokerName = brokerName;
570    }
571
572    public DestinationStatistics getDestinationStatistics() {
573        return destinationStatistics;
574    }
575
576    protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
577        return new JMSException("Unknown destination type: " + destination.getDestinationType());
578    }
579
580    @Override
581    public synchronized void addBroker(Connection connection, BrokerInfo info) {
582        BrokerInfo existing = brokerInfos.get(info.getBrokerId());
583        if (existing == null) {
584            existing = info.copy();
585            existing.setPeerBrokerInfos(null);
586            brokerInfos.put(info.getBrokerId(), existing);
587        }
588        existing.incrementRefCount();
589        LOG.debug("{} addBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size() });
590        addBrokerInClusterUpdate(info);
591    }
592
593    @Override
594    public synchronized void removeBroker(Connection connection, BrokerInfo info) {
595        if (info != null) {
596            BrokerInfo existing = brokerInfos.get(info.getBrokerId());
597            if (existing != null && existing.decrementRefCount() == 0) {
598                brokerInfos.remove(info.getBrokerId());
599            }
600            LOG.debug("{} removeBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size()});
601            // When stopping don't send cluster updates since we are the one's tearing down
602            // our own bridges.
603            if (!brokerService.isStopping()) {
604                removeBrokerInClusterUpdate(info);
605            }
606        }
607    }
608
609    @Override
610    public synchronized BrokerInfo[] getPeerBrokerInfos() {
611        BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
612        result = brokerInfos.values().toArray(result);
613        return result;
614    }
615
616    @Override
617    public void preProcessDispatch(final MessageDispatch messageDispatch) {
618        final Message message = messageDispatch.getMessage();
619        if (message != null) {
620            long endTime = System.currentTimeMillis();
621            message.setBrokerOutTime(endTime);
622            if (getBrokerService().isEnableStatistics()) {
623                long totalTime = endTime - message.getBrokerInTime();
624                ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
625            }
626            if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered()) {
627                final int originalValue = message.getRedeliveryCounter();
628                message.incrementRedeliveryCounter();
629                try {
630                    if (message.isPersistent()) {
631                        ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
632                    }
633                    messageDispatch.setTransmitCallback(new TransmitCallback() {
634                        // dispatch is considered a delivery, so update sub state post dispatch otherwise
635                        // on a disconnect/reconnect cached messages will not reflect initial delivery attempt
636                        final TransmitCallback delegate = messageDispatch.getTransmitCallback();
637                        @Override
638                        public void onSuccess() {
639                            message.incrementRedeliveryCounter();
640                            if (delegate != null) {
641                                delegate.onSuccess();
642                            }
643                        }
644
645                        @Override
646                        public void onFailure() {
647                            if (delegate != null) {
648                                delegate.onFailure();
649                            }
650                        }
651                    });
652                } catch (IOException error) {
653                    RuntimeException runtimeException = new RuntimeException("Failed to persist JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error);
654                    LOG.warn(runtimeException.getLocalizedMessage(), runtimeException);
655                    throw runtimeException;
656                } finally {
657                    message.setRedeliveryCounter(originalValue);
658                }
659            }
660        }
661    }
662
663    @Override
664    public void postProcessDispatch(MessageDispatch messageDispatch) {
665    }
666
667    @Override
668    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
669        ActiveMQDestination destination = messageDispatchNotification.getDestination();
670        getRegion(destination).processDispatchNotification(messageDispatchNotification);
671    }
672
673    @Override
674    public boolean isStopped() {
675        return !started;
676    }
677
678    @Override
679    public Set<ActiveMQDestination> getDurableDestinations() {
680        return destinationFactory.getDestinations();
681    }
682
683    protected void doStop(ServiceStopper ss) {
684        ss.stop(queueRegion);
685        ss.stop(topicRegion);
686        ss.stop(tempQueueRegion);
687        ss.stop(tempTopicRegion);
688    }
689
690    public boolean isKeepDurableSubsActive() {
691        return keepDurableSubsActive;
692    }
693
694    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
695        this.keepDurableSubsActive = keepDurableSubsActive;
696        ((TopicRegion) topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
697    }
698
699    public DestinationInterceptor getDestinationInterceptor() {
700        return destinationInterceptor;
701    }
702
703    @Override
704    public ConnectionContext getAdminConnectionContext() {
705        return adminConnectionContext;
706    }
707
708    @Override
709    public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
710        this.adminConnectionContext = adminConnectionContext;
711    }
712
713    public Map<ConnectionId, ConnectionState> getConnectionStates() {
714        return connectionStates;
715    }
716
717    @Override
718    public PListStore getTempDataStore() {
719        return brokerService.getTempDataStore();
720    }
721
722    @Override
723    public URI getVmConnectorURI() {
724        return brokerService.getVmConnectorURI();
725    }
726
727    @Override
728    public void brokerServiceStarted() {
729    }
730
731    @Override
732    public BrokerService getBrokerService() {
733        return brokerService;
734    }
735
736    @Override
737    public boolean isExpired(MessageReference messageReference) {
738        boolean expired = false;
739        if (messageReference.isExpired()) {
740            try {
741                // prevent duplicate expiry processing
742                Message message = messageReference.getMessage();
743                synchronized (message) {
744                    expired = stampAsExpired(message);
745                }
746            } catch (IOException e) {
747                LOG.warn("unexpected exception on message expiry determination for: {}", messageReference, e);
748            }
749        }
750        return expired;
751    }
752
753    private boolean stampAsExpired(Message message) throws IOException {
754        boolean stamped = false;
755        if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
756            long expiration = message.getExpiration();
757            message.setProperty(ORIGINAL_EXPIRATION, new Long(expiration));
758            stamped = true;
759        }
760        return stamped;
761    }
762
763    @Override
764    public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
765        LOG.debug("Message expired {}", node);
766        getRoot().sendToDeadLetterQueue(context, node, subscription, new Throwable("Message Expired. Expiration:" + node.getExpiration()));
767    }
768
769    @Override
770    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription, Throwable poisonCause) {
771        try {
772            if (node != null) {
773                Message message = node.getMessage();
774                if (message != null && node.getRegionDestination() != null) {
775                    DeadLetterStrategy deadLetterStrategy = ((Destination) node.getRegionDestination()).getDeadLetterStrategy();
776                    if (deadLetterStrategy != null) {
777                        if (deadLetterStrategy.isSendToDeadLetterQueue(message)) {
778                            ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription);
779                            // Prevent a DLQ loop where same message is sent from a DLQ back to itself
780                            if (deadLetterDestination.equals(message.getDestination())) {
781                                LOG.debug("Not re-adding to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
782                                return false;
783                            }
784
785                            // message may be inflight to other subscriptions so do not modify
786                            message = message.copy();
787                            long dlqExpiration = deadLetterStrategy.getExpiration();
788                            if (dlqExpiration > 0) {
789                                dlqExpiration += System.currentTimeMillis();
790                            } else {
791                                stampAsExpired(message);
792                            }
793                            message.setExpiration(dlqExpiration);
794                            if (!message.isPersistent()) {
795                                message.setPersistent(true);
796                                message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
797                            }
798                            if (poisonCause != null) {
799                                message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
800                                        poisonCause.toString());
801                            }
802                            // The original destination and transaction id do
803                            // not get filled when the message is first sent,
804                            // it is only populated if the message is routed to
805                            // another destination like the DLQ
806                            ConnectionContext adminContext = context;
807                            if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) {
808                                adminContext = BrokerSupport.getConnectionContext(this);
809                            }
810                            addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ();
811                            BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination);
812                            return true;
813                        }
814                    } else {
815                        LOG.debug("Dead Letter message with no DLQ strategy in place, message id: {}, destination: {}", message.getMessageId(), message.getDestination());
816                    }
817                }
818            }
819        } catch (Exception e) {
820            LOG.warn("Caught an exception sending to DLQ: {}", node, e);
821        }
822
823        return false;
824    }
825
826    @Override
827    public Broker getRoot() {
828        try {
829            return getBrokerService().getBroker();
830        } catch (Exception e) {
831            LOG.error("Trying to get Root Broker", e);
832            throw new RuntimeException("The broker from the BrokerService should not throw an exception");
833        }
834    }
835
836    /**
837     * @return the broker sequence id
838     */
839    @Override
840    public long getBrokerSequenceId() {
841        synchronized (sequenceGenerator) {
842            return sequenceGenerator.getNextSequenceId();
843        }
844    }
845
846    @Override
847    public Scheduler getScheduler() {
848        return this.scheduler;
849    }
850
851    @Override
852    public ThreadPoolExecutor getExecutor() {
853        return this.executor;
854    }
855
856    @Override
857    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
858        ActiveMQDestination destination = control.getDestination();
859        try {
860            getRegion(destination).processConsumerControl(consumerExchange, control);
861        } catch (JMSException jmse) {
862            LOG.warn("unmatched destination: {}, in consumerControl: {}", destination, control);
863        }
864    }
865
866    protected void addBrokerInClusterUpdate(BrokerInfo info) {
867        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
868        for (TransportConnector connector : connectors) {
869            if (connector.isUpdateClusterClients()) {
870                connector.addPeerBroker(info);
871                connector.updateClientClusterInfo();
872            }
873        }
874    }
875
876    protected void removeBrokerInClusterUpdate(BrokerInfo info) {
877        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
878        for (TransportConnector connector : connectors) {
879            if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
880                connector.removePeerBroker(info);
881                connector.updateClientClusterInfo();
882            }
883        }
884    }
885
886    protected void purgeInactiveDestinations() {
887        inactiveDestinationsPurgeLock.writeLock().lock();
888        try {
889            List<Destination> list = new ArrayList<Destination>();
890            Map<ActiveMQDestination, Destination> map = getDestinationMap();
891            if (isAllowTempAutoCreationOnSend()) {
892                map.putAll(tempQueueRegion.getDestinationMap());
893                map.putAll(tempTopicRegion.getDestinationMap());
894            }
895            long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep();
896            long timeStamp = System.currentTimeMillis();
897            for (Destination d : map.values()) {
898                d.markForGC(timeStamp);
899                if (d.canGC()) {
900                    list.add(d);
901                    if (maxPurgedDests > 0 && list.size() == maxPurgedDests) {
902                        break;
903                    }
904                }
905            }
906
907            if (!list.isEmpty()) {
908                ConnectionContext context = BrokerSupport.getConnectionContext(this);
909                context.setBroker(this);
910
911                for (Destination dest : list) {
912                    Logger log = LOG;
913                    if (dest instanceof BaseDestination) {
914                        log = ((BaseDestination) dest).getLog();
915                    }
916                    log.info("{} Inactive for longer than {} ms - removing ...", dest.getName(), dest.getInactiveTimeoutBeforeGC());
917                    try {
918                        getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
919                    } catch (Exception e) {
920                        LOG.error("Failed to remove inactive destination {}", dest, e);
921                    }
922                }
923            }
924        } finally {
925            inactiveDestinationsPurgeLock.writeLock().unlock();
926        }
927    }
928
929    public boolean isAllowTempAutoCreationOnSend() {
930        return allowTempAutoCreationOnSend;
931    }
932
933    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
934        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
935    }
936
937    @Override
938    public void reapplyInterceptor() {
939        queueRegion.reapplyInterceptor();
940        topicRegion.reapplyInterceptor();
941        tempQueueRegion.reapplyInterceptor();
942        tempTopicRegion.reapplyInterceptor();
943    }
944}