001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.concurrent.CancellationException;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.CopyOnWriteArrayList;
027    import java.util.concurrent.Future;
028    import java.util.concurrent.locks.ReentrantReadWriteLock;
029    
030    import org.apache.activemq.advisory.AdvisorySupport;
031    import org.apache.activemq.broker.BrokerService;
032    import org.apache.activemq.broker.ConnectionContext;
033    import org.apache.activemq.broker.ProducerBrokerExchange;
034    import org.apache.activemq.broker.region.policy.DispatchPolicy;
035    import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
036    import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
037    import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
038    import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
039    import org.apache.activemq.broker.util.InsertionCountList;
040    import org.apache.activemq.command.ActiveMQDestination;
041    import org.apache.activemq.command.ExceptionResponse;
042    import org.apache.activemq.command.Message;
043    import org.apache.activemq.command.MessageAck;
044    import org.apache.activemq.command.MessageId;
045    import org.apache.activemq.command.ProducerAck;
046    import org.apache.activemq.command.ProducerInfo;
047    import org.apache.activemq.command.Response;
048    import org.apache.activemq.command.SubscriptionInfo;
049    import org.apache.activemq.filter.MessageEvaluationContext;
050    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
051    import org.apache.activemq.store.MessageRecoveryListener;
052    import org.apache.activemq.store.TopicMessageStore;
053    import org.apache.activemq.thread.Task;
054    import org.apache.activemq.thread.TaskRunner;
055    import org.apache.activemq.thread.TaskRunnerFactory;
056    import org.apache.activemq.transaction.Synchronization;
057    import org.apache.activemq.util.SubscriptionKey;
058    import org.slf4j.Logger;
059    import org.slf4j.LoggerFactory;
060    
061    /**
062     * The Topic is a destination that sends a copy of a message to every active
063     * Subscription registered.
064     */
065    public class Topic extends BaseDestination implements Task {
066        protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
067        private final TopicMessageStore topicStore;
068        protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
069        private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
070        private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
071        private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
072        private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
073        private final TaskRunner taskRunner;
074        private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
075        private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
076            public void run() {
077                try {
078                    Topic.this.taskRunner.wakeup();
079                } catch (InterruptedException e) {
080                }
081            };
082        };
083    
084        public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
085                DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
086            super(brokerService, store, destination, parentStats);
087            this.topicStore = store;
088            // set default subscription recovery policy
089            if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
090                subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
091                setAlwaysRetroactive(true);
092            } else {
093                subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
094            }
095            this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
096        }
097    
098        @Override
099        public void initialize() throws Exception {
100            super.initialize();
101            if (store != null) {
102                // AMQ-2586: Better to leave this stat at zero than to give the user
103                // misleading metrics.
104                // int messageCount = store.getMessageCount();
105                // destinationStatistics.getMessages().setCount(messageCount);
106            }
107        }
108    
109        public List<Subscription> getConsumers() {
110            synchronized (consumers) {
111                return new ArrayList<Subscription>(consumers);
112            }
113        }
114    
115        public boolean lock(MessageReference node, LockOwner sub) {
116            return true;
117        }
118    
119        public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
120            if (!sub.getConsumerInfo().isDurable()) {
121    
122                // Do a retroactive recovery if needed.
123                if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
124    
125                    // synchronize with dispatch method so that no new messages are sent
126                    // while we are recovering a subscription to avoid out of order messages.
127                    dispatchLock.writeLock().lock();
128                    try {
129                        boolean applyRecovery = false;
130                        synchronized (consumers) {
131                            if (!consumers.contains(sub)){
132                                sub.add(context, this);
133                                consumers.add(sub);
134                                applyRecovery=true;
135                                super.addSubscription(context, sub);
136                            }
137                        }
138                        if (applyRecovery){
139                            subscriptionRecoveryPolicy.recover(context, this, sub);
140                        }
141                    } finally {
142                        dispatchLock.writeLock().unlock();
143                    }
144    
145                } else {
146                    synchronized (consumers) {
147                        if (!consumers.contains(sub)){
148                            sub.add(context, this);
149                            consumers.add(sub);
150                            super.addSubscription(context, sub);
151                        }
152                    }
153                }
154            } else {
155                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
156                super.addSubscription(context, sub);
157                sub.add(context, this);
158                if(dsub.isActive()) {
159                    synchronized (consumers) {
160                        boolean hasSubscription = false;
161    
162                        if (consumers.size() == 0) {
163                            hasSubscription = false;
164                        } else {
165                            for (Subscription currentSub : consumers) {
166                                if (currentSub.getConsumerInfo().isDurable()) {
167                                    DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
168                                    if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
169                                        hasSubscription = true;
170                                        break;
171                                    }
172                                }
173                            }
174                        }
175    
176                        if (!hasSubscription) {
177                            consumers.add(sub);
178                        }
179                    }
180                }
181                durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
182            }
183        }
184    
185        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
186            if (!sub.getConsumerInfo().isDurable()) {
187                super.removeSubscription(context, sub, lastDeliveredSequenceId);
188                synchronized (consumers) {
189                    consumers.remove(sub);
190                }
191            }
192            sub.remove(context, this);
193        }
194    
195        public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
196            if (topicStore != null) {
197                topicStore.deleteSubscription(key.clientId, key.subscriptionName);
198                DurableTopicSubscription removed = durableSubscribers.remove(key);
199                if (removed != null) {
200                    destinationStatistics.getConsumers().decrement();
201                    // deactivate and remove
202                    removed.deactivate(false);
203                    consumers.remove(removed);
204                }
205            }
206        }
207    
208        public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
209            // synchronize with dispatch method so that no new messages are sent
210            // while we are recovering a subscription to avoid out of order messages.
211            dispatchLock.writeLock().lock();
212            try {
213    
214                if (topicStore == null) {
215                    return;
216                }
217    
218                // Recover the durable subscription.
219                String clientId = subscription.getSubscriptionKey().getClientId();
220                String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
221                String selector = subscription.getConsumerInfo().getSelector();
222                SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
223                if (info != null) {
224                    // Check to see if selector changed.
225                    String s1 = info.getSelector();
226                    if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
227                        // Need to delete the subscription
228                        topicStore.deleteSubscription(clientId, subscriptionName);
229                        info = null;
230                        synchronized (consumers) {
231                            consumers.remove(subscription);
232                        }
233                    } else {
234                        synchronized (consumers) {
235                            if (!consumers.contains(subscription)) {
236                                    consumers.add(subscription);
237                            }
238                        }
239                    }
240                }
241    
242                // Do we need to create the subscription?
243                if (info == null) {
244                    info = new SubscriptionInfo();
245                    info.setClientId(clientId);
246                    info.setSelector(selector);
247                    info.setSubscriptionName(subscriptionName);
248                    info.setDestination(getActiveMQDestination());
249                    // This destination is an actual destination id.
250                    info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
251                    // This destination might be a pattern
252                    synchronized (consumers) {
253                        consumers.add(subscription);
254                        topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
255                    }
256                }
257    
258                final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
259                msgContext.setDestination(destination);
260                if (subscription.isRecoveryRequired()) {
261                    topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
262                        public boolean recoverMessage(Message message) throws Exception {
263                            message.setRegionDestination(Topic.this);
264                            try {
265                                msgContext.setMessageReference(message);
266                                if (subscription.matches(message, msgContext)) {
267                                    subscription.add(message);
268                                }
269                            } catch (IOException e) {
270                                LOG.error("Failed to recover this message " + message);
271                            }
272                            return true;
273                        }
274    
275                        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
276                            throw new RuntimeException("Should not be called.");
277                        }
278    
279                        public boolean hasSpace() {
280                            return true;
281                        }
282    
283                        public boolean isDuplicate(MessageId id) {
284                            return false;
285                        }
286                    });
287                }
288            } finally {
289                dispatchLock.writeLock().unlock();
290            }
291        }
292    
293        public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
294            synchronized (consumers) {
295                consumers.remove(sub);
296            }
297            sub.remove(context, this);
298        }
299    
300        protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
301            if (subscription.getConsumerInfo().isRetroactive()) {
302                subscriptionRecoveryPolicy.recover(context, this, subscription);
303            }
304        }
305    
306        public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
307            final ConnectionContext context = producerExchange.getConnectionContext();
308    
309            final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
310            final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
311                    && !context.isInRecoveryMode();
312    
313            // There is delay between the client sending it and it arriving at the
314            // destination.. it may have expired.
315            if (message.isExpired()) {
316                broker.messageExpired(context, message, null);
317                getDestinationStatistics().getExpired().increment();
318                if (sendProducerAck) {
319                    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
320                    context.getConnection().dispatchAsync(ack);
321                }
322                return;
323            }
324    
325            if (memoryUsage.isFull()) {
326                isFull(context, memoryUsage);
327                fastProducer(context, producerInfo);
328    
329                if (isProducerFlowControl() && context.isProducerFlowControl()) {
330    
331                    if (warnOnProducerFlowControl) {
332                        warnOnProducerFlowControl = false;
333                        LOG.info(memoryUsage + ", Usage Manager memory limit reached for "
334                                        + getActiveMQDestination().getQualifiedName()
335                                        + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
336                                        + " See http://activemq.apache.org/producer-flow-control.html for more info");
337                    }
338    
339                    if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
340                        throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
341                                + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
342                                + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
343                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
344                    }
345    
346                    // We can avoid blocking due to low usage if the producer is sending a sync message or
347                    // if it is using a producer window
348                    if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
349                        synchronized (messagesWaitingForSpace) {
350                            messagesWaitingForSpace.add(new Runnable() {
351                                public void run() {
352                                    try {
353    
354                                        // While waiting for space to free up... the
355                                        // message may have expired.
356                                        if (message.isExpired()) {
357                                            broker.messageExpired(context, message, null);
358                                            getDestinationStatistics().getExpired().increment();
359                                        } else {
360                                            doMessageSend(producerExchange, message);
361                                        }
362    
363                                        if (sendProducerAck) {
364                                            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
365                                                    .getSize());
366                                            context.getConnection().dispatchAsync(ack);
367                                        } else {
368                                            Response response = new Response();
369                                            response.setCorrelationId(message.getCommandId());
370                                            context.getConnection().dispatchAsync(response);
371                                        }
372    
373                                    } catch (Exception e) {
374                                        if (!sendProducerAck && !context.isInRecoveryMode()) {
375                                            ExceptionResponse response = new ExceptionResponse(e);
376                                            response.setCorrelationId(message.getCommandId());
377                                            context.getConnection().dispatchAsync(response);
378                                        }
379                                    }
380    
381                                }
382                            });
383    
384                            registerCallbackForNotFullNotification();
385                            context.setDontSendReponse(true);
386                            return;
387                        }
388    
389                    } else {
390                        // Producer flow control cannot be used, so we have do the flow control
391                        // at the broker by blocking this thread until there is space available.
392    
393                        if (memoryUsage.isFull()) {
394                            if (context.isInTransaction()) {
395    
396                                int count = 0;
397                                while (!memoryUsage.waitForSpace(1000)) {
398                                    if (context.getStopping().get()) {
399                                        throw new IOException("Connection closed, send aborted.");
400                                    }
401                                    if (count > 2 && context.isInTransaction()) {
402                                        count = 0;
403                                        int size = context.getTransaction().size();
404                                        LOG.warn("Waiting for space to send  transacted message - transaction elements = "
405                                                + size + " need more space to commit. Message = " + message);
406                                    }
407                                    count++;
408                                }
409                            } else {
410                                waitForSpace(
411                                        context,
412                                        memoryUsage,
413                                        "Usage Manager Memory Usage limit reached. Stopping producer ("
414                                                + message.getProducerId()
415                                                + ") to prevent flooding "
416                                                + getActiveMQDestination().getQualifiedName()
417                                                + "."
418                                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
419                            }
420                        }
421    
422                        // The usage manager could have delayed us by the time
423                        // we unblock the message could have expired..
424                        if (message.isExpired()) {
425                            getDestinationStatistics().getExpired().increment();
426                            if (LOG.isDebugEnabled()) {
427                                LOG.debug("Expired message: " + message);
428                            }
429                            return;
430                        }
431                    }
432                }
433            }
434    
435            doMessageSend(producerExchange, message);
436            messageDelivered(context, message);
437            if (sendProducerAck) {
438                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
439                context.getConnection().dispatchAsync(ack);
440            }
441        }
442    
443        /**
444         * do send the message - this needs to be synchronized to ensure messages
445         * are stored AND dispatched in the right order
446         *
447         * @param producerExchange
448         * @param message
449         * @throws IOException
450         * @throws Exception
451         */
452        synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
453                throws IOException, Exception {
454            final ConnectionContext context = producerExchange.getConnectionContext();
455            message.setRegionDestination(this);
456            message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
457            Future<Object> result = null;
458    
459            if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
460                if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
461                    final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
462                            + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
463                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
464                            + " See http://activemq.apache.org/producer-flow-control.html for more info";
465                    if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
466                        throw new javax.jms.ResourceAllocationException(logMessage);
467                    }
468    
469                    waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
470                }
471                result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
472            }
473    
474            message.incrementReferenceCount();
475    
476            if (context.isInTransaction()) {
477                context.getTransaction().addSynchronization(new Synchronization() {
478                    @Override
479                    public void afterCommit() throws Exception {
480                        // It could take while before we receive the commit
481                        // operation.. by that time the message could have
482                        // expired..
483                        if (broker.isExpired(message)) {
484                            getDestinationStatistics().getExpired().increment();
485                            broker.messageExpired(context, message, null);
486                            message.decrementReferenceCount();
487                            return;
488                        }
489                        try {
490                            dispatch(context, message);
491                        } finally {
492                            message.decrementReferenceCount();
493                        }
494                    }
495                });
496    
497            } else {
498                try {
499                    dispatch(context, message);
500                } finally {
501                    message.decrementReferenceCount();
502                }
503            }
504    
505            if (result != null && !result.isCancelled()) {
506                try {
507                    result.get();
508                } catch (CancellationException e) {
509                    // ignore - the task has been cancelled if the message
510                    // has already been deleted
511                }
512            }
513        }
514    
515        private boolean canOptimizeOutPersistence() {
516            return durableSubscribers.size() == 0;
517        }
518    
519        @Override
520        public String toString() {
521            return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
522        }
523    
524        public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
525                final MessageReference node) throws IOException {
526            if (topicStore != null && node.isPersistent()) {
527                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
528                SubscriptionKey key = dsub.getSubscriptionKey();
529                topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
530                        convertToNonRangedAck(ack, node));
531            }
532            messageConsumed(context, node);
533        }
534    
535        public void gc() {
536        }
537    
538        public Message loadMessage(MessageId messageId) throws IOException {
539            return topicStore != null ? topicStore.getMessage(messageId) : null;
540        }
541    
542        public void start() throws Exception {
543            this.subscriptionRecoveryPolicy.start();
544            if (memoryUsage != null) {
545                memoryUsage.start();
546            }
547    
548            if (getExpireMessagesPeriod() > 0) {
549                scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
550            }
551        }
552    
553        public void stop() throws Exception {
554            if (taskRunner != null) {
555                taskRunner.shutdown();
556            }
557            this.subscriptionRecoveryPolicy.stop();
558            if (memoryUsage != null) {
559                memoryUsage.stop();
560            }
561            if (this.topicStore != null) {
562                this.topicStore.stop();
563            }
564    
565             scheduler.cancel(expireMessagesTask);
566        }
567    
568        public Message[] browse() {
569            final List<Message> result = new ArrayList<Message>();
570            doBrowse(result, getMaxBrowsePageSize());
571            return result.toArray(new Message[result.size()]);
572        }
573    
574        private void doBrowse(final List<Message> browseList, final int max) {
575            try {
576                if (topicStore != null) {
577                    final List<Message> toExpire = new ArrayList<Message>();
578                    topicStore.recover(new MessageRecoveryListener() {
579                        public boolean recoverMessage(Message message) throws Exception {
580                            if (message.isExpired()) {
581                                toExpire.add(message);
582                            }
583                            browseList.add(message);
584                            return true;
585                        }
586    
587                        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
588                            return true;
589                        }
590    
591                        public boolean hasSpace() {
592                            return browseList.size() < max;
593                        }
594    
595                        public boolean isDuplicate(MessageId id) {
596                            return false;
597                        }
598                    });
599                    final ConnectionContext connectionContext = createConnectionContext();
600                    for (Message message : toExpire) {
601                        for (DurableTopicSubscription sub : durableSubscribers.values()) {
602                            if (!sub.isActive()) {
603                                messageExpired(connectionContext, sub, message);
604                            }
605                        }
606                    }
607                    Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
608                    if (msgs != null) {
609                        for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
610                            browseList.add(msgs[i]);
611                        }
612                    }
613                }
614            } catch (Throwable e) {
615                LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
616            }
617        }
618    
619        public boolean iterate() {
620            synchronized (messagesWaitingForSpace) {
621                while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
622                    Runnable op = messagesWaitingForSpace.removeFirst();
623                    op.run();
624                }
625    
626                if (!messagesWaitingForSpace.isEmpty()) {
627                    registerCallbackForNotFullNotification();
628                }
629            }
630            return false;
631        }
632    
633        private void registerCallbackForNotFullNotification() {
634            // If the usage manager is not full, then the task will not
635            // get called..
636            if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
637                // so call it directly here.
638                sendMessagesWaitingForSpaceTask.run();
639            }
640        }
641    
642        // Properties
643        // -------------------------------------------------------------------------
644    
645        public DispatchPolicy getDispatchPolicy() {
646            return dispatchPolicy;
647        }
648    
649        public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
650            this.dispatchPolicy = dispatchPolicy;
651        }
652    
653        public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
654            return subscriptionRecoveryPolicy;
655        }
656    
657        public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
658            this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
659        }
660    
661        // Implementation methods
662        // -------------------------------------------------------------------------
663    
664        public final void wakeup() {
665        }
666    
667        protected void dispatch(final ConnectionContext context, Message message) throws Exception {
668            // AMQ-2586: Better to leave this stat at zero than to give the user
669            // misleading metrics.
670            // destinationStatistics.getMessages().increment();
671            destinationStatistics.getEnqueues().increment();
672            MessageEvaluationContext msgContext = null;
673    
674            dispatchLock.readLock().lock();
675            try {
676                if (!subscriptionRecoveryPolicy.add(context, message)) {
677                    return;
678                }
679                synchronized (consumers) {
680                    if (consumers.isEmpty()) {
681                        onMessageWithNoConsumers(context, message);
682                        return;
683                    }
684                }
685                msgContext = context.getMessageEvaluationContext();
686                msgContext.setDestination(destination);
687                msgContext.setMessageReference(message);
688                if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
689                    onMessageWithNoConsumers(context, message);
690                }
691    
692            } finally {
693                dispatchLock.readLock().unlock();
694                if (msgContext != null) {
695                    msgContext.clear();
696                }
697            }
698        }
699    
700        private final Runnable expireMessagesTask = new Runnable() {
701            public void run() {
702                List<Message> browsedMessages = new InsertionCountList<Message>();
703                doBrowse(browsedMessages, getMaxExpirePageSize());
704            }
705        };
706    
707        public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
708            broker.messageExpired(context, reference, subs);
709            // AMQ-2586: Better to leave this stat at zero than to give the user
710            // misleading metrics.
711            // destinationStatistics.getMessages().decrement();
712            destinationStatistics.getEnqueues().decrement();
713            destinationStatistics.getExpired().increment();
714            MessageAck ack = new MessageAck();
715            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
716            ack.setDestination(destination);
717            ack.setMessageID(reference.getMessageId());
718            try {
719                if (subs instanceof DurableTopicSubscription) {
720                    ((DurableTopicSubscription)subs).removePending(reference);
721                }
722                acknowledge(context, subs, ack, reference);
723            } catch (Exception e) {
724                LOG.error("Failed to remove expired Message from the store ", e);
725            }
726        }
727    
728        @Override
729        protected Logger getLog() {
730            return LOG;
731        }
732    
733        protected boolean isOptimizeStorage(){
734            boolean result = false;
735    
736            if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
737                    result = true;
738                    for (DurableTopicSubscription s : durableSubscribers.values()) {
739                        if (s.isActive()== false){
740                            result = false;
741                            break;
742                        }
743                        if (s.getPrefetchSize()==0){
744                            result = false;
745                            break;
746                        }
747                        if (s.isSlowConsumer()){
748                            result = false;
749                            break;
750                        }
751                        if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
752                            result = false;
753                            break;
754                        }
755                    }
756            }
757            return result;
758        }
759    
760        /**
761         * force a reread of the store - after transaction recovery completion
762         */
763        public void clearPendingMessages() {
764            dispatchLock.readLock().lock();
765            try {
766                for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
767                    clearPendingAndDispatch(durableTopicSubscription);
768                }
769            } finally {
770                dispatchLock.readLock().unlock();
771            }
772        }
773    
774        private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
775            synchronized (durableTopicSubscription.pendingLock) {
776                durableTopicSubscription.pending.clear();
777                try {
778                    durableTopicSubscription.dispatchPending();
779                } catch (IOException exception) {
780                    LOG.warn("After clear of pending, failed to dispatch to: " +
781                            durableTopicSubscription + ", for :" + destination + ", pending: " +
782                            durableTopicSubscription.pending, exception);
783                }
784            }
785        }
786    
787        public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
788            return durableSubscribers;
789        }
790    }