001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicLong;
024
025import javax.jms.JMSException;
026
027import org.apache.activemq.ActiveMQMessageAudit;
028import org.apache.activemq.broker.Broker;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
031import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
032import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
033import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
034import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
035import org.apache.activemq.command.ConsumerControl;
036import org.apache.activemq.command.ConsumerInfo;
037import org.apache.activemq.command.Message;
038import org.apache.activemq.command.MessageAck;
039import org.apache.activemq.command.MessageDispatch;
040import org.apache.activemq.command.MessageDispatchNotification;
041import org.apache.activemq.command.MessageId;
042import org.apache.activemq.command.MessagePull;
043import org.apache.activemq.command.Response;
044import org.apache.activemq.thread.Scheduler;
045import org.apache.activemq.transaction.Synchronization;
046import org.apache.activemq.transport.TransmitCallback;
047import org.apache.activemq.usage.SystemUsage;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051public class TopicSubscription extends AbstractSubscription {
052
053    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
054    private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
055
056    protected PendingMessageCursor matched;
057    protected final SystemUsage usageManager;
058    boolean singleDestination = true;
059    Destination destination;
060    private final Scheduler scheduler;
061
062    private int maximumPendingMessages = -1;
063    private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
064    private int discarded;
065    private final Object matchedListMutex = new Object();
066    private int memoryUsageHighWaterMark = 95;
067    // allow duplicate suppression in a ring network of brokers
068    protected int maxProducersToAudit = 1024;
069    protected int maxAuditDepth = 1000;
070    protected boolean enableAudit = false;
071    protected ActiveMQMessageAudit audit;
072    protected boolean active = false;
073    protected boolean discarding = false;
074    private boolean useTopicSubscriptionInflightStats = true;
075
076    //Used for inflight message size calculations
077    protected final Object dispatchLock = new Object();
078    protected final List<DispatchedNode> dispatched = new ArrayList<>();
079
080    public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
081        super(broker, context, info);
082        this.usageManager = usageManager;
083        String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
084        if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) {
085            this.matched = new VMPendingMessageCursor(false);
086        } else {
087            this.matched = new FilePendingMessageCursor(broker,matchedName,false);
088        }
089
090        this.scheduler = broker.getScheduler();
091    }
092
093    public void init() throws Exception {
094        this.matched.setSystemUsage(usageManager);
095        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
096        this.matched.start();
097        if (enableAudit) {
098            audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
099        }
100        this.active=true;
101    }
102
103    @Override
104    public void add(MessageReference node) throws Exception {
105        if (isDuplicate(node)) {
106            return;
107        }
108        // Lets use an indirect reference so that we can associate a unique
109        // locator /w the message.
110        node = new IndirectMessageReference(node.getMessage());
111        getSubscriptionStatistics().getEnqueues().increment();
112        synchronized (matchedListMutex) {
113            // if this subscriber is already discarding a message, we don't want to add
114            // any more messages to it as those messages can only be advisories generated in the process,
115            // which can trigger the recursive call loop
116            if (discarding) return;
117
118            if (!isFull() && matched.isEmpty()) {
119                // if maximumPendingMessages is set we will only discard messages which
120                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
121                dispatch(node);
122                setSlowConsumer(false);
123            } else {
124                if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
125                    // Slow consumers should log and set their state as such.
126                    if (!isSlowConsumer()) {
127                        LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString());
128                        setSlowConsumer(true);
129                        for (Destination dest: destinations) {
130                            dest.slowConsumer(getContext(), this);
131                        }
132                    }
133                }
134                if (maximumPendingMessages != 0) {
135                    boolean warnedAboutWait = false;
136                    while (active) {
137                        while (matched.isFull()) {
138                            if (getContext().getStopping().get()) {
139                                LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId());
140                                getSubscriptionStatistics().getEnqueues().decrement();
141                                return;
142                            }
143                            if (!warnedAboutWait) {
144                                LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
145                                        new Object[]{
146                                                toString(),
147                                                matched,
148                                                matched.getSystemUsage().getTempUsage().getPercentUsage(),
149                                                matched.getSystemUsage().getMemoryUsage().getPercentUsage()
150                                        });
151                                warnedAboutWait = true;
152                            }
153                            matchedListMutex.wait(20);
154                        }
155                        // Temporary storage could be full - so just try to add the message
156                        // see https://issues.apache.org/activemq/browse/AMQ-2475
157                        if (matched.tryAddMessageLast(node, 10)) {
158                            break;
159                        }
160                    }
161                    if (maximumPendingMessages > 0) {
162                        // calculate the high water mark from which point we
163                        // will eagerly evict expired messages
164                        int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
165                        if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
166                            max = maximumPendingMessages;
167                        }
168                        if (!matched.isEmpty() && matched.size() > max) {
169                            removeExpiredMessages();
170                        }
171                        // lets discard old messages as we are a slow consumer
172                        while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
173                            int pageInSize = matched.size() - maximumPendingMessages;
174                            // only page in a 1000 at a time - else we could blow the memory
175                            pageInSize = Math.max(1000, pageInSize);
176                            LinkedList<MessageReference> list = null;
177                            MessageReference[] oldMessages=null;
178                            synchronized(matched){
179                                list = matched.pageInList(pageInSize);
180                                oldMessages = messageEvictionStrategy.evictMessages(list);
181                                for (MessageReference ref : list) {
182                                    ref.decrementReferenceCount();
183                                }
184                            }
185                            int messagesToEvict = 0;
186                            if (oldMessages != null){
187                                messagesToEvict = oldMessages.length;
188                                for (int i = 0; i < messagesToEvict; i++) {
189                                    MessageReference oldMessage = oldMessages[i];
190                                    discard(oldMessage);
191                                }
192                            }
193                            // lets avoid an infinite loop if we are given a bad eviction strategy
194                            // for a bad strategy lets just not evict
195                            if (messagesToEvict == 0) {
196                                LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{
197                                        destination, messageEvictionStrategy, list.size()
198                                });
199                                break;
200                            }
201                        }
202                    }
203                    dispatchMatched();
204                }
205            }
206        }
207    }
208
209    private boolean isDuplicate(MessageReference node) {
210        boolean duplicate = false;
211        if (enableAudit && audit != null) {
212            duplicate = audit.isDuplicate(node);
213            if (LOG.isDebugEnabled()) {
214                if (duplicate) {
215                    LOG.debug("{}, ignoring duplicate add: {}", this, node.getMessageId());
216                }
217            }
218        }
219        return duplicate;
220    }
221
222    /**
223     * Discard any expired messages from the matched list. Called from a
224     * synchronized block.
225     *
226     * @throws IOException
227     */
228    protected void removeExpiredMessages() throws IOException {
229        try {
230            matched.reset();
231            while (matched.hasNext()) {
232                MessageReference node = matched.next();
233                node.decrementReferenceCount();
234                if (node.isExpired()) {
235                    matched.remove();
236                    node.decrementReferenceCount();
237                    if (broker.isExpired(node)) {
238                        ((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
239                        broker.messageExpired(getContext(), node, this);
240                    }
241                    break;
242                }
243            }
244        } finally {
245            matched.release();
246        }
247    }
248
249    @Override
250    public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
251        synchronized (matchedListMutex) {
252            try {
253                matched.reset();
254                while (matched.hasNext()) {
255                    MessageReference node = matched.next();
256                    node.decrementReferenceCount();
257                    if (node.getMessageId().equals(mdn.getMessageId())) {
258                        synchronized(dispatchLock) {
259                            matched.remove();
260                            getSubscriptionStatistics().getDispatched().increment();
261                            if (isUseTopicSubscriptionInflightStats()) {
262                                dispatched.add(new DispatchedNode(node));
263                                getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
264                            }
265                            node.decrementReferenceCount();
266                        }
267                        break;
268                    }
269                }
270            } finally {
271                matched.release();
272            }
273        }
274    }
275
276    @Override
277    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
278        super.acknowledge(context, ack);
279
280        if (ack.isStandardAck()) {
281            updateStatsOnAck(context, ack);
282        } else if (ack.isPoisonAck()) {
283            if (ack.isInTransaction()) {
284                throw new JMSException("Poison ack cannot be transacted: " + ack);
285            }
286            updateStatsOnAck(context, ack);
287            contractPrefetchExtension(ack.getMessageCount());
288        } else if (ack.isIndividualAck()) {
289            updateStatsOnAck(context, ack);
290            if (ack.isInTransaction()) {
291                expandPrefetchExtension(1);
292            }
293        } else if (ack.isExpiredAck()) {
294            updateStatsOnAck(ack);
295            contractPrefetchExtension(ack.getMessageCount());
296        } else if (ack.isDeliveredAck()) {
297            // Message was delivered but not acknowledged: update pre-fetch counters.
298           expandPrefetchExtension(ack.getMessageCount());
299        } else if (ack.isRedeliveredAck()) {
300            // No processing for redelivered needed
301            return;
302        } else {
303            throw new JMSException("Invalid acknowledgment: " + ack);
304        }
305
306        dispatchMatched();
307    }
308
309    private void updateStatsOnAck(final ConnectionContext context, final MessageAck ack) {
310        if (context.isInTransaction()) {
311            context.getTransaction().addSynchronization(new Synchronization() {
312
313                @Override
314                public void afterRollback() {
315                    contractPrefetchExtension(ack.getMessageCount());
316                }
317
318                @Override
319                public void afterCommit() throws Exception {
320                    contractPrefetchExtension(ack.getMessageCount());
321                    updateStatsOnAck(ack);
322                    dispatchMatched();
323                }
324            });
325        } else {
326            updateStatsOnAck(ack);
327        }
328    }
329
330    @Override
331    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
332
333        // The slave should not deliver pull messages.
334        if (getPrefetchSize() == 0) {
335
336            final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount();
337            prefetchExtension.set(pull.getQuantity());
338            dispatchMatched();
339
340            // If there was nothing dispatched.. we may need to setup a timeout.
341            if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
342
343                // immediate timeout used by receiveNoWait()
344                if (pull.getTimeout() == -1) {
345                    // Send a NULL message to signal nothing pending.
346                    dispatch(null);
347                    prefetchExtension.set(0);
348                }
349
350                if (pull.getTimeout() > 0) {
351                    scheduler.executeAfterDelay(new Runnable() {
352
353                        @Override
354                        public void run() {
355                            pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone());
356                        }
357                    }, pull.getTimeout());
358                }
359            }
360        }
361        return null;
362    }
363
364    /**
365     * Occurs when a pull times out. If nothing has been dispatched since the
366     * timeout was setup, then send the NULL message.
367     */
368    private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) {
369        synchronized (matchedListMutex) {
370            if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || alwaysSendDone) {
371                try {
372                    dispatch(null);
373                } catch (Exception e) {
374                    context.getConnection().serviceException(e);
375                } finally {
376                    prefetchExtension.set(0);
377                }
378            }
379        }
380    }
381
382    /**
383     * Update the statistics on message ack.
384     * @param ack
385     */
386    private void updateStatsOnAck(final MessageAck ack) {
387        //Allow disabling inflight stats to save memory usage
388        if (isUseTopicSubscriptionInflightStats()) {
389            synchronized(dispatchLock) {
390                boolean inAckRange = false;
391                List<DispatchedNode> removeList = new ArrayList<>();
392                for (final DispatchedNode node : dispatched) {
393                    MessageId messageId = node.getMessageId();
394                    if (ack.getFirstMessageId() == null
395                            || ack.getFirstMessageId().equals(messageId)) {
396                        inAckRange = true;
397                    }
398                    if (inAckRange) {
399                        removeList.add(node);
400                        if (ack.getLastMessageId().equals(messageId)) {
401                            break;
402                        }
403                    }
404                }
405
406                for (final DispatchedNode node : removeList) {
407                    dispatched.remove(node);
408                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
409
410                    final Destination destination = node.getDestination();
411                    incrementStatsOnAck(destination, ack, 1);
412                    if (!ack.isInTransaction()) {
413                        contractPrefetchExtension(1);
414                    }
415                }
416            }
417        } else {
418            if (singleDestination && destination != null) {
419                incrementStatsOnAck(destination, ack, ack.getMessageCount());
420            }
421            if (!ack.isInTransaction()) {
422                contractPrefetchExtension(ack.getMessageCount());
423            }
424        }
425    }
426
427    private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) {
428        getSubscriptionStatistics().getDequeues().add(count);
429        destination.getDestinationStatistics().getDequeues().add(count);
430        destination.getDestinationStatistics().getInflight().subtract(count);
431        if (info.isNetworkSubscription()) {
432            destination.getDestinationStatistics().getForwards().add(count);
433        }
434        if (ack.isExpiredAck()) {
435            destination.getDestinationStatistics().getExpired().add(count);
436        }
437    }
438
439    @Override
440    public int countBeforeFull() {
441        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
442    }
443
444    @Override
445    public int getPendingQueueSize() {
446        return matched();
447    }
448
449    @Override
450    public long getPendingMessageSize() {
451        synchronized (matchedListMutex) {
452            return matched.messageSize();
453        }
454    }
455
456    @Override
457    public int getDispatchedQueueSize() {
458        return (int)(getSubscriptionStatistics().getDispatched().getCount() -
459                     getSubscriptionStatistics().getDequeues().getCount());
460    }
461
462    public int getMaximumPendingMessages() {
463        return maximumPendingMessages;
464    }
465
466    @Override
467    public long getDispatchedCounter() {
468        return getSubscriptionStatistics().getDispatched().getCount();
469    }
470
471    @Override
472    public long getEnqueueCounter() {
473        return getSubscriptionStatistics().getEnqueues().getCount();
474    }
475
476    @Override
477    public long getDequeueCounter() {
478        return getSubscriptionStatistics().getDequeues().getCount();
479    }
480
481    /**
482     * @return the number of messages discarded due to being a slow consumer
483     */
484    public int discarded() {
485        synchronized (matchedListMutex) {
486            return discarded;
487        }
488    }
489
490    /**
491     * @return the number of matched messages (messages targeted for the
492     *         subscription but not yet able to be dispatched due to the
493     *         prefetch buffer being full).
494     */
495    public int matched() {
496        synchronized (matchedListMutex) {
497            return matched.size();
498        }
499    }
500
501    /**
502     * Sets the maximum number of pending messages that can be matched against
503     * this consumer before old messages are discarded.
504     */
505    public void setMaximumPendingMessages(int maximumPendingMessages) {
506        this.maximumPendingMessages = maximumPendingMessages;
507    }
508
509    public MessageEvictionStrategy getMessageEvictionStrategy() {
510        return messageEvictionStrategy;
511    }
512
513    /**
514     * Sets the eviction strategy used to decide which message to evict when the
515     * slow consumer needs to discard messages
516     */
517    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
518        this.messageEvictionStrategy = messageEvictionStrategy;
519    }
520
521    public int getMaxProducersToAudit() {
522        return maxProducersToAudit;
523    }
524
525    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
526        this.maxProducersToAudit = maxProducersToAudit;
527        if (audit != null) {
528            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
529        }
530    }
531
532    public int getMaxAuditDepth() {
533        return maxAuditDepth;
534    }
535
536    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
537        this.maxAuditDepth = maxAuditDepth;
538        if (audit != null) {
539            audit.setAuditDepth(maxAuditDepth);
540        }
541    }
542
543    public boolean isEnableAudit() {
544        return enableAudit;
545    }
546
547    public synchronized void setEnableAudit(boolean enableAudit) {
548        this.enableAudit = enableAudit;
549        if (enableAudit && audit == null) {
550            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
551        }
552    }
553
554    // Implementation methods
555    // -------------------------------------------------------------------------
556    @Override
557    public boolean isFull() {
558        return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize();
559    }
560
561    @Override
562    public int getInFlightSize() {
563        return getDispatchedQueueSize();
564    }
565
566    /**
567     * @return true when 60% or more room is left for dispatching messages
568     */
569    @Override
570    public boolean isLowWaterMark() {
571        return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
572    }
573
574    /**
575     * @return true when 10% or less room is left for dispatching messages
576     */
577    @Override
578    public boolean isHighWaterMark() {
579        return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
580    }
581
582    /**
583     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
584     */
585    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
586        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
587    }
588
589    /**
590     * @return the memoryUsageHighWaterMark
591     */
592    public int getMemoryUsageHighWaterMark() {
593        return this.memoryUsageHighWaterMark;
594    }
595
596    /**
597     * @return the usageManager
598     */
599    public SystemUsage getUsageManager() {
600        return this.usageManager;
601    }
602
603    /**
604     * @return the matched
605     */
606    public PendingMessageCursor getMatched() {
607        return this.matched;
608    }
609
610    /**
611     * @param matched the matched to set
612     */
613    public void setMatched(PendingMessageCursor matched) {
614        this.matched = matched;
615    }
616
617    /**
618     * inform the MessageConsumer on the client to change it's prefetch
619     *
620     * @param newPrefetch
621     */
622    @Override
623    public void updateConsumerPrefetch(int newPrefetch) {
624        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
625            ConsumerControl cc = new ConsumerControl();
626            cc.setConsumerId(info.getConsumerId());
627            cc.setPrefetch(newPrefetch);
628            context.getConnection().dispatchAsync(cc);
629        }
630    }
631
632    private void dispatchMatched() throws IOException {
633        synchronized (matchedListMutex) {
634            if (!matched.isEmpty() && !isFull()) {
635                try {
636                    matched.reset();
637
638                    while (matched.hasNext() && !isFull()) {
639                        MessageReference message = matched.next();
640                        message.decrementReferenceCount();
641                        matched.remove();
642                        // Message may have been sitting in the matched list a while
643                        // waiting for the consumer to ak the message.
644                        if (message.isExpired()) {
645                            discard(message);
646                            continue; // just drop it.
647                        }
648                        dispatch(message);
649                    }
650                } finally {
651                    matched.release();
652                }
653            }
654        }
655    }
656
657    private void dispatch(final MessageReference node) throws IOException {
658        Message message = node != null ? node.getMessage() : null;
659        if (node != null) {
660            node.incrementReferenceCount();
661        }
662        // Make sure we can dispatch a message.
663        MessageDispatch md = new MessageDispatch();
664        md.setMessage(message);
665        md.setConsumerId(info.getConsumerId());
666        if (node != null) {
667            md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
668            synchronized(dispatchLock) {
669                getSubscriptionStatistics().getDispatched().increment();
670                if (isUseTopicSubscriptionInflightStats()) {
671                    dispatched.add(new DispatchedNode(node));
672                    getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
673                }
674            }
675
676            // Keep track if this subscription is receiving messages from a single destination.
677            if (singleDestination) {
678                if (destination == null) {
679                    destination = (Destination)node.getRegionDestination();
680                } else {
681                    if (destination != node.getRegionDestination()) {
682                        singleDestination = false;
683                    }
684                }
685            }
686
687            if (getPrefetchSize() == 0) {
688                decrementPrefetchExtension(1);
689            }
690        }
691
692        if (info.isDispatchAsync()) {
693            if (node != null) {
694                md.setTransmitCallback(new TransmitCallback() {
695
696                    @Override
697                    public void onSuccess() {
698                        Destination regionDestination = (Destination) node.getRegionDestination();
699                        regionDestination.getDestinationStatistics().getDispatched().increment();
700                        regionDestination.getDestinationStatistics().getInflight().increment();
701                        node.decrementReferenceCount();
702                    }
703
704                    @Override
705                    public void onFailure() {
706                        Destination regionDestination = (Destination) node.getRegionDestination();
707                        regionDestination.getDestinationStatistics().getDispatched().increment();
708                        regionDestination.getDestinationStatistics().getInflight().increment();
709                        node.decrementReferenceCount();
710                    }
711                });
712            }
713            context.getConnection().dispatchAsync(md);
714        } else {
715            context.getConnection().dispatchSync(md);
716            if (node != null) {
717                Destination regionDestination = (Destination) node.getRegionDestination();
718                regionDestination.getDestinationStatistics().getDispatched().increment();
719                regionDestination.getDestinationStatistics().getInflight().increment();
720                node.decrementReferenceCount();
721            }
722        }
723    }
724
725    private void discard(MessageReference message) {
726        discarding = true;
727        try {
728            message.decrementReferenceCount();
729            matched.remove(message);
730            discarded++;
731            if (destination != null) {
732                destination.getDestinationStatistics().getDequeues().increment();
733            }
734            LOG.debug("{}, discarding message {}", this, message);
735            Destination dest = (Destination) message.getRegionDestination();
736            if (dest != null) {
737                dest.messageDiscarded(getContext(), this, message);
738            }
739            broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
740        } finally {
741            discarding = false;
742        }
743    }
744
745    @Override
746    public String toString() {
747        return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
748                + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get()
749                + ", usePrefetchExtension=" + isUsePrefetchExtension();
750    }
751
752    @Override
753    public void destroy() {
754        this.active=false;
755        synchronized (matchedListMutex) {
756            try {
757                matched.destroy();
758            } catch (Exception e) {
759                LOG.warn("Failed to destroy cursor", e);
760            }
761        }
762        setSlowConsumer(false);
763        synchronized(dispatchLock) {
764            dispatched.clear();
765        }
766    }
767
768    @Override
769    public int getPrefetchSize() {
770        return info.getPrefetchSize();
771    }
772
773    @Override
774    public void setPrefetchSize(int newSize) {
775        info.setPrefetchSize(newSize);
776        try {
777            dispatchMatched();
778        } catch(Exception e) {
779            LOG.trace("Caught exception on dispatch after prefetch size change.");
780        }
781    }
782
783    public boolean isUseTopicSubscriptionInflightStats() {
784        return useTopicSubscriptionInflightStats;
785    }
786
787    public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
788        this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
789    }
790
791    private static class DispatchedNode {
792        private final int size;
793        private final MessageId messageId;
794        private final Destination destination;
795
796        public DispatchedNode(final MessageReference node) {
797            super();
798            this.size = node.getSize();
799            this.messageId = node.getMessageId();
800            this.destination = node.getRegionDestination() instanceof Destination ?
801                    ((Destination)node.getRegionDestination()) : null;
802        }
803
804        public long getSize() {
805            return size;
806        }
807
808        public MessageId getMessageId() {
809            return messageId;
810        }
811
812        public Destination getDestination() {
813            return destination;
814        }
815    }
816
817}