001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.concurrent.CancellationException;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.CopyOnWriteArrayList;
027    import java.util.concurrent.Future;
028    import java.util.concurrent.locks.ReentrantReadWriteLock;
029    
030    import org.apache.activemq.broker.BrokerService;
031    import org.apache.activemq.broker.ConnectionContext;
032    import org.apache.activemq.broker.ProducerBrokerExchange;
033    import org.apache.activemq.broker.region.policy.DispatchPolicy;
034    import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
035    import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
036    import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
037    import org.apache.activemq.broker.util.InsertionCountList;
038    import org.apache.activemq.command.ActiveMQDestination;
039    import org.apache.activemq.command.ExceptionResponse;
040    import org.apache.activemq.command.Message;
041    import org.apache.activemq.command.MessageAck;
042    import org.apache.activemq.command.MessageId;
043    import org.apache.activemq.command.ProducerAck;
044    import org.apache.activemq.command.ProducerInfo;
045    import org.apache.activemq.command.Response;
046    import org.apache.activemq.command.SubscriptionInfo;
047    import org.apache.activemq.filter.MessageEvaluationContext;
048    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
049    import org.apache.activemq.store.MessageRecoveryListener;
050    import org.apache.activemq.store.TopicMessageStore;
051    import org.apache.activemq.thread.Task;
052    import org.apache.activemq.thread.TaskRunner;
053    import org.apache.activemq.thread.TaskRunnerFactory;
054    import org.apache.activemq.transaction.Synchronization;
055    import org.apache.activemq.util.SubscriptionKey;
056    import org.slf4j.Logger;
057    import org.slf4j.LoggerFactory;
058    
059    /**
060     * The Topic is a destination that sends a copy of a message to every active
061     * Subscription registered.
062     */
063    public class Topic extends BaseDestination implements Task {
064        protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
065        private final TopicMessageStore topicStore;
066        protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
067        private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
068        private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
069        private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
070        private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
071        private final TaskRunner taskRunner;
072        private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
073        private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
074            public void run() {
075                try {
076                    Topic.this.taskRunner.wakeup();
077                } catch (InterruptedException e) {
078                }
079            };
080        };
081    
082        public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
083                DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
084            super(brokerService, store, destination, parentStats);
085            this.topicStore = store;
086            // set default subscription recovery policy
087            subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
088            this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
089        }
090    
091        @Override
092        public void initialize() throws Exception {
093            super.initialize();
094            if (store != null) {
095                // AMQ-2586: Better to leave this stat at zero than to give the user
096                // misleading metrics.
097                // int messageCount = store.getMessageCount();
098                // destinationStatistics.getMessages().setCount(messageCount);
099            }
100        }
101    
102        public List<Subscription> getConsumers() {
103            synchronized (consumers) {
104                return new ArrayList<Subscription>(consumers);
105            }
106        }
107    
108        public boolean lock(MessageReference node, LockOwner sub) {
109            return true;
110        }
111    
112        public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
113            if (!sub.getConsumerInfo().isDurable()) {
114    
115                // Do a retroactive recovery if needed.
116                if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
117    
118                    // synchronize with dispatch method so that no new messages are sent
119                    // while we are recovering a subscription to avoid out of order messages.
120                    dispatchLock.writeLock().lock();
121                    try {
122                        boolean applyRecovery = false;
123                        synchronized (consumers) {
124                            if (!consumers.contains(sub)){
125                                sub.add(context, this);
126                                consumers.add(sub);
127                                applyRecovery=true;
128                                super.addSubscription(context, sub);
129                            }
130                        }
131                        if (applyRecovery){
132                            subscriptionRecoveryPolicy.recover(context, this, sub);
133                        }
134                    } finally {
135                        dispatchLock.writeLock().unlock();
136                    }
137    
138                } else {
139                    synchronized (consumers) {
140                        if (!consumers.contains(sub)){
141                            sub.add(context, this);
142                            consumers.add(sub);
143                            super.addSubscription(context, sub);
144                        }
145                    }
146                }
147            } else {
148                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
149                super.addSubscription(context, sub);
150                sub.add(context, this);
151                if(dsub.isActive()) {
152                    synchronized (consumers) {
153                        boolean hasSubscription = false;
154    
155                        if (consumers.size() == 0) {
156                            hasSubscription = false;
157                        } else {
158                            for (Subscription currentSub : consumers) {
159                                if (currentSub.getConsumerInfo().isDurable()) {
160                                    DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
161                                    if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
162                                        hasSubscription = true;
163                                        break;
164                                    }
165                                }
166                            }
167                        }
168    
169                        if (!hasSubscription) {
170                            consumers.add(sub);
171                        }
172                    }
173                }
174                durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
175            }
176        }
177    
178        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
179            if (!sub.getConsumerInfo().isDurable()) {
180                super.removeSubscription(context, sub, lastDeliveredSequenceId);
181                synchronized (consumers) {
182                    consumers.remove(sub);
183                }
184            }
185            sub.remove(context, this);
186        }
187    
188        public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
189            if (topicStore != null) {
190                topicStore.deleteSubscription(key.clientId, key.subscriptionName);
191                DurableTopicSubscription removed = durableSubcribers.remove(key);
192                if (removed != null) {
193                    destinationStatistics.getConsumers().decrement();
194                    // deactivate and remove
195                    removed.deactivate(false);
196                    consumers.remove(removed);
197                }
198            }
199        }
200    
201        public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
202            // synchronize with dispatch method so that no new messages are sent
203            // while we are recovering a subscription to avoid out of order messages.
204            dispatchLock.writeLock().lock();
205            try {
206    
207                if (topicStore == null) {
208                    return;
209                }
210    
211                // Recover the durable subscription.
212                String clientId = subscription.getSubscriptionKey().getClientId();
213                String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
214                String selector = subscription.getConsumerInfo().getSelector();
215                SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
216                if (info != null) {
217                    // Check to see if selector changed.
218                    String s1 = info.getSelector();
219                    if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
220                        // Need to delete the subscription
221                        topicStore.deleteSubscription(clientId, subscriptionName);
222                        info = null;
223                        synchronized (consumers) {
224                            consumers.remove(subscription);
225                        }
226                    } else {
227                        synchronized (consumers) {
228                            if (!consumers.contains(subscription)) {
229                                    consumers.add(subscription);
230                            }
231                        }
232                    }
233                }
234    
235                // Do we need to create the subscription?
236                if (info == null) {
237                    info = new SubscriptionInfo();
238                    info.setClientId(clientId);
239                    info.setSelector(selector);
240                    info.setSubscriptionName(subscriptionName);
241                    info.setDestination(getActiveMQDestination());
242                    // This destination is an actual destination id.
243                    info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
244                    // This destination might be a pattern
245                    synchronized (consumers) {
246                        consumers.add(subscription);
247                        topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
248                    }
249                }
250    
251                final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
252                msgContext.setDestination(destination);
253                if (subscription.isRecoveryRequired()) {
254                    topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
255                        public boolean recoverMessage(Message message) throws Exception {
256                            message.setRegionDestination(Topic.this);
257                            try {
258                                msgContext.setMessageReference(message);
259                                if (subscription.matches(message, msgContext)) {
260                                    subscription.add(message);
261                                }
262                            } catch (IOException e) {
263                                LOG.error("Failed to recover this message " + message);
264                            }
265                            return true;
266                        }
267    
268                        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
269                            throw new RuntimeException("Should not be called.");
270                        }
271    
272                        public boolean hasSpace() {
273                            return true;
274                        }
275    
276                        public boolean isDuplicate(MessageId id) {
277                            return false;
278                        }
279                    });
280                }
281            } finally {
282                dispatchLock.writeLock().unlock();
283            }
284        }
285    
286        public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
287            synchronized (consumers) {
288                consumers.remove(sub);
289            }
290            sub.remove(context, this);
291        }
292    
293        protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
294            if (subscription.getConsumerInfo().isRetroactive()) {
295                subscriptionRecoveryPolicy.recover(context, this, subscription);
296            }
297        }
298    
299        public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
300            final ConnectionContext context = producerExchange.getConnectionContext();
301    
302            final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
303            final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
304                    && !context.isInRecoveryMode();
305    
306            // There is delay between the client sending it and it arriving at the
307            // destination.. it may have expired.
308            if (message.isExpired()) {
309                broker.messageExpired(context, message, null);
310                getDestinationStatistics().getExpired().increment();
311                if (sendProducerAck) {
312                    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
313                    context.getConnection().dispatchAsync(ack);
314                }
315                return;
316            }
317    
318            if (memoryUsage.isFull()) {
319                isFull(context, memoryUsage);
320                fastProducer(context, producerInfo);
321    
322                if (isProducerFlowControl() && context.isProducerFlowControl()) {
323    
324                    if (warnOnProducerFlowControl) {
325                        warnOnProducerFlowControl = false;
326                        LOG.info(memoryUsage + ", Usage Manager memory limit reached for "
327                                        + getActiveMQDestination().getQualifiedName()
328                                        + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
329                                        + " See http://activemq.apache.org/producer-flow-control.html for more info");
330                    }
331    
332                    if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
333                        throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
334                                + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
335                                + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
336                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
337                    }
338    
339                    // We can avoid blocking due to low usage if the producer is sending a sync message or
340                    // if it is using a producer window
341                    if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
342                        synchronized (messagesWaitingForSpace) {
343                            messagesWaitingForSpace.add(new Runnable() {
344                                public void run() {
345                                    try {
346    
347                                        // While waiting for space to free up... the
348                                        // message may have expired.
349                                        if (message.isExpired()) {
350                                            broker.messageExpired(context, message, null);
351                                            getDestinationStatistics().getExpired().increment();
352                                        } else {
353                                            doMessageSend(producerExchange, message);
354                                        }
355    
356                                        if (sendProducerAck) {
357                                            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
358                                                    .getSize());
359                                            context.getConnection().dispatchAsync(ack);
360                                        } else {
361                                            Response response = new Response();
362                                            response.setCorrelationId(message.getCommandId());
363                                            context.getConnection().dispatchAsync(response);
364                                        }
365    
366                                    } catch (Exception e) {
367                                        if (!sendProducerAck && !context.isInRecoveryMode()) {
368                                            ExceptionResponse response = new ExceptionResponse(e);
369                                            response.setCorrelationId(message.getCommandId());
370                                            context.getConnection().dispatchAsync(response);
371                                        }
372                                    }
373    
374                                }
375                            });
376    
377                            registerCallbackForNotFullNotification();
378                            context.setDontSendReponse(true);
379                            return;
380                        }
381    
382                    } else {
383                        // Producer flow control cannot be used, so we have do the flow control
384                        // at the broker by blocking this thread until there is space available.
385    
386                        if (memoryUsage.isFull()) {
387                            if (context.isInTransaction()) {
388    
389                                int count = 0;
390                                while (!memoryUsage.waitForSpace(1000)) {
391                                    if (context.getStopping().get()) {
392                                        throw new IOException("Connection closed, send aborted.");
393                                    }
394                                    if (count > 2 && context.isInTransaction()) {
395                                        count = 0;
396                                        int size = context.getTransaction().size();
397                                        LOG.warn("Waiting for space to send  transacted message - transaction elements = "
398                                                + size + " need more space to commit. Message = " + message);
399                                    }
400                                }
401                            } else {
402                                waitForSpace(
403                                        context,
404                                        memoryUsage,
405                                        "Usage Manager Memory Usage limit reached. Stopping producer ("
406                                                + message.getProducerId()
407                                                + ") to prevent flooding "
408                                                + getActiveMQDestination().getQualifiedName()
409                                                + "."
410                                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
411                            }
412                        }
413    
414                        // The usage manager could have delayed us by the time
415                        // we unblock the message could have expired..
416                        if (message.isExpired()) {
417                            getDestinationStatistics().getExpired().increment();
418                            if (LOG.isDebugEnabled()) {
419                                LOG.debug("Expired message: " + message);
420                            }
421                            return;
422                        }
423                    }
424                }
425            }
426    
427            doMessageSend(producerExchange, message);
428            messageDelivered(context, message);
429            if (sendProducerAck) {
430                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
431                context.getConnection().dispatchAsync(ack);
432            }
433        }
434    
435        /**
436         * do send the message - this needs to be synchronized to ensure messages
437         * are stored AND dispatched in the right order
438         *
439         * @param producerExchange
440         * @param message
441         * @throws IOException
442         * @throws Exception
443         */
444        synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
445                throws IOException, Exception {
446            final ConnectionContext context = producerExchange.getConnectionContext();
447            message.setRegionDestination(this);
448            message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
449            Future<Object> result = null;
450    
451            if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
452                if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
453                    final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
454                            + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
455                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
456                            + " See http://activemq.apache.org/producer-flow-control.html for more info";
457                    if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
458                        throw new javax.jms.ResourceAllocationException(logMessage);
459                    }
460    
461                    waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
462                }
463                result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
464            }
465    
466            message.incrementReferenceCount();
467    
468            if (context.isInTransaction()) {
469                context.getTransaction().addSynchronization(new Synchronization() {
470                    @Override
471                    public void afterCommit() throws Exception {
472                        // It could take while before we receive the commit
473                        // operation.. by that time the message could have
474                        // expired..
475                        if (broker.isExpired(message)) {
476                            getDestinationStatistics().getExpired().increment();
477                            broker.messageExpired(context, message, null);
478                            message.decrementReferenceCount();
479                            return;
480                        }
481                        try {
482                            dispatch(context, message);
483                        } finally {
484                            message.decrementReferenceCount();
485                        }
486                    }
487                });
488    
489            } else {
490                try {
491                    dispatch(context, message);
492                } finally {
493                    message.decrementReferenceCount();
494                }
495            }
496    
497            if (result != null && !result.isCancelled()) {
498                try {
499                    result.get();
500                } catch (CancellationException e) {
501                    // ignore - the task has been cancelled if the message
502                    // has already been deleted
503                }
504            }
505        }
506    
507        private boolean canOptimizeOutPersistence() {
508            return durableSubcribers.size() == 0;
509        }
510    
511        @Override
512        public String toString() {
513            return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
514        }
515    
516        public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
517                final MessageReference node) throws IOException {
518            if (topicStore != null && node.isPersistent()) {
519                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
520                SubscriptionKey key = dsub.getSubscriptionKey();
521                topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
522                        convertToNonRangedAck(ack, node));
523            }
524            messageConsumed(context, node);
525        }
526    
527        public void gc() {
528        }
529    
530        public Message loadMessage(MessageId messageId) throws IOException {
531            return topicStore != null ? topicStore.getMessage(messageId) : null;
532        }
533    
534        public void start() throws Exception {
535            this.subscriptionRecoveryPolicy.start();
536            if (memoryUsage != null) {
537                memoryUsage.start();
538            }
539    
540            if (getExpireMessagesPeriod() > 0) {
541                scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
542            }
543        }
544    
545        public void stop() throws Exception {
546            if (taskRunner != null) {
547                taskRunner.shutdown();
548            }
549            this.subscriptionRecoveryPolicy.stop();
550            if (memoryUsage != null) {
551                memoryUsage.stop();
552            }
553            if (this.topicStore != null) {
554                this.topicStore.stop();
555            }
556    
557             scheduler.cancel(expireMessagesTask);
558        }
559    
560        public Message[] browse() {
561            final List<Message> result = new ArrayList<Message>();
562            doBrowse(result, getMaxBrowsePageSize());
563            return result.toArray(new Message[result.size()]);
564        }
565    
566        private void doBrowse(final List<Message> browseList, final int max) {
567            try {
568                if (topicStore != null) {
569                    final List<Message> toExpire = new ArrayList<Message>();
570                    topicStore.recover(new MessageRecoveryListener() {
571                        public boolean recoverMessage(Message message) throws Exception {
572                            if (message.isExpired()) {
573                                toExpire.add(message);
574                            }
575                            browseList.add(message);
576                            return true;
577                        }
578    
579                        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
580                            return true;
581                        }
582    
583                        public boolean hasSpace() {
584                            return browseList.size() < max;
585                        }
586    
587                        public boolean isDuplicate(MessageId id) {
588                            return false;
589                        }
590                    });
591                    final ConnectionContext connectionContext = createConnectionContext();
592                    for (Message message : toExpire) {
593                        for (DurableTopicSubscription sub : durableSubcribers.values()) {
594                            if (!sub.isActive()) {
595                                messageExpired(connectionContext, sub, message);
596                            }
597                        }
598                    }
599                    Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
600                    if (msgs != null) {
601                        for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
602                            browseList.add(msgs[i]);
603                        }
604                    }
605                }
606            } catch (Throwable e) {
607                LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
608            }
609        }
610    
611        public boolean iterate() {
612            synchronized (messagesWaitingForSpace) {
613                while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
614                    Runnable op = messagesWaitingForSpace.removeFirst();
615                    op.run();
616                }
617    
618                if (!messagesWaitingForSpace.isEmpty()) {
619                    registerCallbackForNotFullNotification();
620                }
621            }
622            return false;
623        }
624    
625        private void registerCallbackForNotFullNotification() {
626            // If the usage manager is not full, then the task will not
627            // get called..
628            if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
629                // so call it directly here.
630                sendMessagesWaitingForSpaceTask.run();
631            }
632        }
633    
634        // Properties
635        // -------------------------------------------------------------------------
636    
637        public DispatchPolicy getDispatchPolicy() {
638            return dispatchPolicy;
639        }
640    
641        public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
642            this.dispatchPolicy = dispatchPolicy;
643        }
644    
645        public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
646            return subscriptionRecoveryPolicy;
647        }
648    
649        public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
650            this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
651        }
652    
653        // Implementation methods
654        // -------------------------------------------------------------------------
655    
656        public final void wakeup() {
657        }
658    
659        protected void dispatch(final ConnectionContext context, Message message) throws Exception {
660            // AMQ-2586: Better to leave this stat at zero than to give the user
661            // misleading metrics.
662            // destinationStatistics.getMessages().increment();
663            destinationStatistics.getEnqueues().increment();
664            MessageEvaluationContext msgContext = null;
665    
666            dispatchLock.readLock().lock();
667            try {
668                if (!subscriptionRecoveryPolicy.add(context, message)) {
669                    return;
670                }
671                synchronized (consumers) {
672                    if (consumers.isEmpty()) {
673                        onMessageWithNoConsumers(context, message);
674                        return;
675                    }
676                }
677                msgContext = context.getMessageEvaluationContext();
678                msgContext.setDestination(destination);
679                msgContext.setMessageReference(message);
680                if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
681                    onMessageWithNoConsumers(context, message);
682                }
683    
684            } finally {
685                dispatchLock.readLock().unlock();
686                if (msgContext != null) {
687                    msgContext.clear();
688                }
689            }
690        }
691    
692        private final Runnable expireMessagesTask = new Runnable() {
693            public void run() {
694                List<Message> browsedMessages = new InsertionCountList<Message>();
695                doBrowse(browsedMessages, getMaxExpirePageSize());
696            }
697        };
698    
699        public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
700            broker.messageExpired(context, reference, subs);
701            // AMQ-2586: Better to leave this stat at zero than to give the user
702            // misleading metrics.
703            // destinationStatistics.getMessages().decrement();
704            destinationStatistics.getEnqueues().decrement();
705            destinationStatistics.getExpired().increment();
706            MessageAck ack = new MessageAck();
707            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
708            ack.setDestination(destination);
709            ack.setMessageID(reference.getMessageId());
710            try {
711                if (subs instanceof DurableTopicSubscription) {
712                    ((DurableTopicSubscription)subs).removePending(reference);
713                }
714                acknowledge(context, subs, ack, reference);
715            } catch (Exception e) {
716                LOG.error("Failed to remove expired Message from the store ", e);
717            }
718        }
719    
720        @Override
721        protected Logger getLog() {
722            return LOG;
723        }
724    
725        protected boolean isOptimizeStorage(){
726            boolean result = false;
727    
728            if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){
729                    result = true;
730                    for (DurableTopicSubscription s : durableSubcribers.values()) {
731                        if (s.isActive()== false){
732                            result = false;
733                            break;
734                        }
735                        if (s.getPrefetchSize()==0){
736                            result = false;
737                            break;
738                        }
739                        if (s.isSlowConsumer()){
740                            result = false;
741                            break;
742                        }
743                        if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
744                            result = false;
745                            break;
746                        }
747                    }
748            }
749            return result;
750        }
751    
752        /**
753         * force a reread of the store - after transaction recovery completion
754         */
755        public void clearPendingMessages() {
756            dispatchLock.readLock().lock();
757            try {
758                for (DurableTopicSubscription durableTopicSubscription : durableSubcribers.values()) {
759                    clearPendingAndDispatch(durableTopicSubscription);
760                }
761            } finally {
762                dispatchLock.readLock().unlock();
763            }
764        }
765    
766        private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
767            synchronized (durableTopicSubscription.pendingLock) {
768                durableTopicSubscription.pending.clear();
769                try {
770                    durableTopicSubscription.dispatchPending();
771                } catch (IOException exception) {
772                    LOG.warn("After clear of pending, failed to dispatch to: " +
773                            durableTopicSubscription + ", for :" + destination + ", pending: " +
774                            durableTopicSubscription.pending, exception);
775                }
776            }
777        }
778    
779        public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
780            return durableSubcribers;
781        }
782    }