001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.concurrent.CountDownLatch;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicInteger;
026    
027    import javax.jms.InvalidSelectorException;
028    import javax.jms.JMSException;
029    
030    import org.apache.activemq.broker.Broker;
031    import org.apache.activemq.broker.ConnectionContext;
032    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
034    import org.apache.activemq.command.ActiveMQMessage;
035    import org.apache.activemq.command.ConsumerControl;
036    import org.apache.activemq.command.ConsumerInfo;
037    import org.apache.activemq.command.Message;
038    import org.apache.activemq.command.MessageAck;
039    import org.apache.activemq.command.MessageDispatch;
040    import org.apache.activemq.command.MessageDispatchNotification;
041    import org.apache.activemq.command.MessageId;
042    import org.apache.activemq.command.MessagePull;
043    import org.apache.activemq.command.Response;
044    import org.apache.activemq.thread.Scheduler;
045    import org.apache.activemq.transaction.Synchronization;
046    import org.apache.activemq.usage.SystemUsage;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /**
051     * A subscription that honors the pre-fetch option of the ConsumerInfo.
052     */
053    public abstract class PrefetchSubscription extends AbstractSubscription {
054    
055        private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
056        protected final Scheduler scheduler;
057    
058        protected PendingMessageCursor pending;
059        protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
060        protected final AtomicInteger prefetchExtension = new AtomicInteger();
061        protected boolean usePrefetchExtension = true;
062        protected long enqueueCounter;
063        protected long dispatchCounter;
064        protected long dequeueCounter;
065        private int maxProducersToAudit=32;
066        private int maxAuditDepth=2048;
067        protected final SystemUsage usageManager;
068        protected final Object pendingLock = new Object();
069        protected final Object dispatchLock = new Object();
070        private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
071    
072        public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
073            super(broker,context, info);
074            this.usageManager=usageManager;
075            pending = cursor;
076            this.scheduler = broker.getScheduler();
077        }
078    
079        public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
080            this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
081        }
082    
083        /**
084         * Allows a message to be pulled on demand by a client
085         */
086        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
087            // The slave should not deliver pull messages.
088            // TODO: when the slave becomes a master, He should send a NULL message to all the
089            // consumers to 'wake them up' in case they were waiting for a message.
090            if (getPrefetchSize() == 0 && !isSlave()) {
091    
092                prefetchExtension.incrementAndGet();
093                final long dispatchCounterBeforePull = dispatchCounter;
094    
095                // Have the destination push us some messages.
096                for (Destination dest : destinations) {
097                    dest.iterate();
098                }
099                dispatchPending();
100    
101                synchronized(this) {
102                    // If there was nothing dispatched.. we may need to setup a timeout.
103                    if (dispatchCounterBeforePull == dispatchCounter) {
104                        // immediate timeout used by receiveNoWait()
105                        if (pull.getTimeout() == -1) {
106                            // Send a NULL message.
107                            add(QueueMessageReference.NULL_MESSAGE);
108                            dispatchPending();
109                        }
110                        if (pull.getTimeout() > 0) {
111                            scheduler.executeAfterDelay(new Runnable() {
112                                @Override
113                                public void run() {
114                                    pullTimeout(dispatchCounterBeforePull);
115                                }
116                            }, pull.getTimeout());
117                        }
118                    }
119                }
120            }
121            return null;
122        }
123    
124        /**
125         * Occurs when a pull times out. If nothing has been dispatched since the
126         * timeout was setup, then send the NULL message.
127         */
128        final void pullTimeout(long dispatchCounterBeforePull) {
129            synchronized (pendingLock) {
130                if (dispatchCounterBeforePull == dispatchCounter) {
131                    try {
132                        add(QueueMessageReference.NULL_MESSAGE);
133                        dispatchPending();
134                    } catch (Exception e) {
135                        context.getConnection().serviceException(e);
136                    }
137                }
138            }
139        }
140    
141        public void add(MessageReference node) throws Exception {
142            synchronized (pendingLock) {
143                // The destination may have just been removed...
144                if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
145                    // perhaps we should inform the caller that we are no longer valid to dispatch to?
146                    return;
147                }
148    
149                // Don't increment for the pullTimeout control message.
150                if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
151                    enqueueCounter++;
152                }
153                pending.addMessageLast(node);
154            }
155            dispatchPending();
156        }
157    
158        public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
159            synchronized(pendingLock) {
160                try {
161                    pending.reset();
162                    while (pending.hasNext()) {
163                        MessageReference node = pending.next();
164                        node.decrementReferenceCount();
165                        if (node.getMessageId().equals(mdn.getMessageId())) {
166                            // Synchronize between dispatched list and removal of messages from pending list
167                            // related to remove subscription action
168                            synchronized(dispatchLock) {
169                                pending.remove();
170                                createMessageDispatch(node, node.getMessage());
171                                dispatched.add(node);
172                                onDispatch(node, node.getMessage());
173                            }
174                            return;
175                        }
176                    }
177                } finally {
178                    pending.release();
179                }
180            }
181            throw new JMSException(
182                    "Slave broker out of sync with master: Dispatched message ("
183                            + mdn.getMessageId() + ") was not in the pending list for "
184                            + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
185        }
186    
187        public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
188            // Handle the standard acknowledgment case.
189            boolean callDispatchMatched = false;
190            Destination destination = null;
191    
192            if (!isSlave()) {
193                if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
194                    // suppress unexpected ack exception in this expected case
195                    LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
196                    return;
197                }
198            }
199            if (LOG.isTraceEnabled()) {
200                LOG.trace("ack:" + ack);
201            }
202            synchronized(dispatchLock) {
203                if (ack.isStandardAck()) {
204                    // First check if the ack matches the dispatched. When using failover this might
205                    // not be the case. We don't ever want to ack the wrong messages.
206                    assertAckMatchesDispatched(ack);
207    
208                    // Acknowledge all dispatched messages up till the message id of
209                    // the acknowledgment.
210                    int index = 0;
211                    boolean inAckRange = false;
212                    List<MessageReference> removeList = new ArrayList<MessageReference>();
213                    for (final MessageReference node : dispatched) {
214                        MessageId messageId = node.getMessageId();
215                        if (ack.getFirstMessageId() == null
216                                || ack.getFirstMessageId().equals(messageId)) {
217                            inAckRange = true;
218                        }
219                        if (inAckRange) {
220                            // Don't remove the nodes until we are committed.
221                            if (!context.isInTransaction()) {
222                                dequeueCounter++;
223                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
224                                removeList.add(node);
225                            } else {
226                                registerRemoveSync(context, node);
227                            }
228                            index++;
229                            acknowledge(context, ack, node);
230                            if (ack.getLastMessageId().equals(messageId)) {
231                                // contract prefetch if dispatch required a pull
232                                if (getPrefetchSize() == 0) {
233                                    // Protect extension update against parallel updates.
234                                    while (true) {
235                                        int currentExtension = prefetchExtension.get();
236                                        int newExtension = Math.max(0, currentExtension - index);
237                                        if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
238                                            break;
239                                        }
240                                    }
241                                } else if (usePrefetchExtension && context.isInTransaction()) {
242                                    // extend prefetch window only if not a pulling consumer
243                                    while (true) {
244                                        int currentExtension = prefetchExtension.get();
245                                        int newExtension = Math.max(currentExtension, index);
246                                        if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
247                                            break;
248                                        }
249                                    }
250                                }
251                                destination = node.getRegionDestination();
252                                callDispatchMatched = true;
253                                break;
254                            }
255                        }
256                    }
257                    for (final MessageReference node : removeList) {
258                        dispatched.remove(node);
259                    }
260                    // this only happens after a reconnect - get an ack which is not
261                    // valid
262                    if (!callDispatchMatched) {
263                        LOG.warn("Could not correlate acknowledgment with dispatched message: "
264                                      + ack);
265                    }
266                } else if (ack.isIndividualAck()) {
267                    // Message was delivered and acknowledge - but only delete the
268                    // individual message
269                    for (final MessageReference node : dispatched) {
270                        MessageId messageId = node.getMessageId();
271                        if (ack.getLastMessageId().equals(messageId)) {
272                            // Don't remove the nodes until we are committed - immediateAck option
273                            if (!context.isInTransaction()) {
274                                dequeueCounter++;
275                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
276                                dispatched.remove(node);
277                            } else {
278                                registerRemoveSync(context, node);
279                            }
280    
281                            // Protect extension update against parallel updates.
282                            while (true) {
283                                int currentExtension = prefetchExtension.get();
284                                int newExtension = Math.max(0, currentExtension - 1);
285                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
286                                    break;
287                                }
288                            }
289                            acknowledge(context, ack, node);
290                            destination = node.getRegionDestination();
291                            callDispatchMatched = true;
292                            break;
293                        }
294                    }
295                }else if (ack.isDeliveredAck()) {
296                    // Message was delivered but not acknowledged: update pre-fetch
297                    // counters.
298                    int index = 0;
299                    for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
300                        final MessageReference node = iter.next();
301                        if (node.isExpired()) {
302                            if (broker.isExpired(node)) {
303                                node.getRegionDestination().messageExpired(context, this, node);
304                            }
305                            iter.remove();
306                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
307                        }
308                        if (ack.getLastMessageId().equals(node.getMessageId())) {
309                            if (usePrefetchExtension) {
310                                while (true) {
311                                    int currentExtension = prefetchExtension.get();
312                                    int newExtension = Math.max(currentExtension, index + 1);
313                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
314                                        break;
315                                    }
316                                }
317                            }
318                            destination = node.getRegionDestination();
319                            callDispatchMatched = true;
320                            break;
321                        }
322                    }
323                    if (!callDispatchMatched) {
324                        throw new JMSException(
325                                "Could not correlate acknowledgment with dispatched message: "
326                                        + ack);
327                    }
328                } else if (ack.isRedeliveredAck()) {
329                    // Message was re-delivered but it was not yet considered to be
330                    // a DLQ message.
331                    boolean inAckRange = false;
332                    for (final MessageReference node : dispatched) {
333                        MessageId messageId = node.getMessageId();
334                        if (ack.getFirstMessageId() == null
335                                || ack.getFirstMessageId().equals(messageId)) {
336                            inAckRange = true;
337                        }
338                        if (inAckRange) {
339                            if (ack.getLastMessageId().equals(messageId)) {
340                                destination = node.getRegionDestination();
341                                callDispatchMatched = true;
342                                break;
343                            }
344                        }
345                    }
346                    if (!callDispatchMatched) {
347                        throw new JMSException(
348                                "Could not correlate acknowledgment with dispatched message: "
349                                        + ack);
350                    }
351                } else if (ack.isPoisonAck()) {
352                    // TODO: what if the message is already in a DLQ???
353                    // Handle the poison ACK case: we need to send the message to a
354                    // DLQ
355                    if (ack.isInTransaction()) {
356                        throw new JMSException("Poison ack cannot be transacted: "
357                                + ack);
358                    }
359                    int index = 0;
360                    boolean inAckRange = false;
361                    List<MessageReference> removeList = new ArrayList<MessageReference>();
362                    for (final MessageReference node : dispatched) {
363                        MessageId messageId = node.getMessageId();
364                        if (ack.getFirstMessageId() == null
365                                || ack.getFirstMessageId().equals(messageId)) {
366                            inAckRange = true;
367                        }
368                        if (inAckRange) {
369                            if (ack.getPoisonCause() != null) {
370                                node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
371                                        ack.getPoisonCause().toString());
372                            }
373                            sendToDLQ(context, node);
374                            node.getRegionDestination().getDestinationStatistics()
375                                    .getInflight().decrement();
376                            removeList.add(node);
377                            dequeueCounter++;
378                            index++;
379                            acknowledge(context, ack, node);
380                            if (ack.getLastMessageId().equals(messageId)) {
381                                while (true) {
382                                    int currentExtension = prefetchExtension.get();
383                                    int newExtension = Math.max(0, currentExtension - (index + 1));
384                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
385                                        break;
386                                    }
387                                }
388                                destination = node.getRegionDestination();
389                                callDispatchMatched = true;
390                                break;
391                            }
392                        }
393                    }
394                    for (final MessageReference node : removeList) {
395                        dispatched.remove(node);
396                    }
397                    if (!callDispatchMatched) {
398                        throw new JMSException(
399                                "Could not correlate acknowledgment with dispatched message: "
400                                        + ack);
401                    }
402                }
403            }
404            if (callDispatchMatched && destination != null) {
405                destination.wakeup();
406                dispatchPending();
407            } else {
408                if (isSlave()) {
409                    throw new JMSException(
410                            "Slave broker out of sync with master: Acknowledgment ("
411                                    + ack + ") was not in the dispatch list: "
412                                    + dispatched);
413                } else {
414                    LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
415                            + ack);
416                }
417            }
418        }
419    
420        private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
421            // setup a Synchronization to remove nodes from the
422            // dispatched list.
423            context.getTransaction().addSynchronization(
424                    new Synchronization() {
425    
426                        @Override
427                        public void afterCommit()
428                                throws Exception {
429                            synchronized(dispatchLock) {
430                                dequeueCounter++;
431                                dispatched.remove(node);
432                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
433                            }
434                            node.getRegionDestination().wakeup();
435                            dispatchPending();
436                        }
437    
438                        @Override
439                        public void afterRollback() throws Exception {
440                            synchronized(dispatchLock) {
441                                if (isSlave()) {
442                                    node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
443                                } else {
444                                    // poisionAck will decrement - otherwise still inflight on client
445                                }
446                            }
447                        }
448                    });
449        }
450    
451        /**
452         * Checks an ack versus the contents of the dispatched list.
453         *  called with dispatchLock held
454         * @param ack
455         * @throws JMSException if it does not match
456         */
457        protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
458            MessageId firstAckedMsg = ack.getFirstMessageId();
459            MessageId lastAckedMsg = ack.getLastMessageId();
460            int checkCount = 0;
461            boolean checkFoundStart = false;
462            boolean checkFoundEnd = false;
463            for (MessageReference node : dispatched) {
464    
465                if (firstAckedMsg == null) {
466                    checkFoundStart = true;
467                } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
468                    checkFoundStart = true;
469                }
470    
471                if (checkFoundStart) {
472                    checkCount++;
473                }
474    
475                if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
476                    checkFoundEnd = true;
477                    break;
478                }
479            }
480            if (!checkFoundStart && firstAckedMsg != null)
481                throw new JMSException("Unmatched acknowledge: " + ack
482                        + "; Could not find Message-ID " + firstAckedMsg
483                        + " in dispatched-list (start of ack)");
484            if (!checkFoundEnd && lastAckedMsg != null)
485                throw new JMSException("Unmatched acknowledge: " + ack
486                        + "; Could not find Message-ID " + lastAckedMsg
487                        + " in dispatched-list (end of ack)");
488            if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
489                throw new JMSException("Unmatched acknowledge: " + ack
490                        + "; Expected message count (" + ack.getMessageCount()
491                        + ") differs from count in dispatched-list (" + checkCount
492                        + ")");
493            }
494        }
495    
496        /**
497         * @param context
498         * @param node
499         * @throws IOException
500         * @throws Exception
501         */
502        protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
503            broker.getRoot().sendToDeadLetterQueue(context, node, this);
504        }
505    
506        public int getInFlightSize() {
507            return dispatched.size();
508        }
509    
510        /**
511         * Used to determine if the broker can dispatch to the consumer.
512         *
513         * @return
514         */
515        public boolean isFull() {
516            return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
517        }
518    
519        /**
520         * @return true when 60% or more room is left for dispatching messages
521         */
522        public boolean isLowWaterMark() {
523            return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
524        }
525    
526        /**
527         * @return true when 10% or less room is left for dispatching messages
528         */
529        public boolean isHighWaterMark() {
530            return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
531        }
532    
533        @Override
534        public int countBeforeFull() {
535            return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
536        }
537    
538        public int getPendingQueueSize() {
539            return pending.size();
540        }
541    
542        public int getDispatchedQueueSize() {
543            return dispatched.size();
544        }
545    
546        public long getDequeueCounter() {
547            return dequeueCounter;
548        }
549    
550        public long getDispatchedCounter() {
551            return dispatchCounter;
552        }
553    
554        public long getEnqueueCounter() {
555            return enqueueCounter;
556        }
557    
558        @Override
559        public boolean isRecoveryRequired() {
560            return pending.isRecoveryRequired();
561        }
562    
563        public PendingMessageCursor getPending() {
564            return this.pending;
565        }
566    
567        public void setPending(PendingMessageCursor pending) {
568            this.pending = pending;
569            if (this.pending!=null) {
570                this.pending.setSystemUsage(usageManager);
571                this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
572            }
573        }
574    
575       @Override
576        public void add(ConnectionContext context, Destination destination) throws Exception {
577            synchronized(pendingLock) {
578                super.add(context, destination);
579                pending.add(context, destination);
580            }
581        }
582    
583        @Override
584        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
585            List<MessageReference> rc = new ArrayList<MessageReference>();
586            synchronized(pendingLock) {
587                super.remove(context, destination);
588                // Here is a potential problem concerning Inflight stat:
589                // Messages not already committed or rolled back may not be removed from dispatched list at the moment
590                // Except if each commit or rollback callback action comes before remove of subscriber.
591                rc.addAll(pending.remove(context, destination));
592    
593                // Synchronized to DispatchLock
594                synchronized(dispatchLock) {
595                    ArrayList<MessageReference> references = new ArrayList<MessageReference>();
596                    for (MessageReference r : dispatched) {
597                        if( r.getRegionDestination() == destination) {
598                            references.add(r);
599                        }
600                    }
601                    rc.addAll(references);
602                    destination.getDestinationStatistics().getDispatched().subtract(references.size());
603                    destination.getDestinationStatistics().getInflight().subtract(references.size());
604                    dispatched.removeAll(references);
605                }
606            }
607            return rc;
608        }
609    
610        protected void dispatchPending() throws IOException {
611            if (!isSlave()) {
612               synchronized(pendingLock) {
613                    try {
614                        int numberToDispatch = countBeforeFull();
615                        if (numberToDispatch > 0) {
616                            setSlowConsumer(false);
617                            setPendingBatchSize(pending, numberToDispatch);
618                            int count = 0;
619                            pending.reset();
620                            while (pending.hasNext() && !isFull()
621                                    && count < numberToDispatch) {
622                                MessageReference node = pending.next();
623                                if (node == null) {
624                                    break;
625                                }
626    
627                                // Synchronize between dispatched list and remove of message from pending list
628                                // related to remove subscription action
629                                synchronized(dispatchLock) {
630                                    pending.remove();
631                                    node.decrementReferenceCount();
632                                    if( !isDropped(node) && canDispatch(node)) {
633    
634                                        // Message may have been sitting in the pending
635                                        // list a while waiting for the consumer to ak the message.
636                                        if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
637                                            //increment number to dispatch
638                                            numberToDispatch++;
639                                            if (broker.isExpired(node)) {
640                                                node.getRegionDestination().messageExpired(context, this, node);
641                                            }
642                                            continue;
643                                        }
644                                        dispatch(node);
645                                        count++;
646                                    }
647                                }
648                            }
649                        } else if (!isSlowConsumer()) {
650                            setSlowConsumer(true);
651                            for (Destination dest :destinations) {
652                                dest.slowConsumer(context, this);
653                            }
654                        }
655                    } finally {
656                        pending.release();
657                    }
658                }
659            }
660        }
661    
662        protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
663            pending.setMaxBatchSize(numberToDispatch);
664        }
665    
666        // called with dispatchLock held
667        protected boolean dispatch(final MessageReference node) throws IOException {
668            final Message message = node.getMessage();
669            if (message == null) {
670                return false;
671            }
672    
673            okForAckAsDispatchDone.countDown();
674    
675            // No reentrant lock - Patch needed to IndirectMessageReference on method lock
676            if (!isSlave()) {
677    
678                MessageDispatch md = createMessageDispatch(node, message);
679                // NULL messages don't count... they don't get Acked.
680                if (node != QueueMessageReference.NULL_MESSAGE) {
681                    dispatchCounter++;
682                    dispatched.add(node);
683                } else {
684                    while (true) {
685                        int currentExtension = prefetchExtension.get();
686                        int newExtension = Math.max(0, currentExtension - 1);
687                        if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
688                            break;
689                        }
690                    }
691                }
692                if (info.isDispatchAsync()) {
693                    md.setTransmitCallback(new Runnable() {
694    
695                        public void run() {
696                            // Since the message gets queued up in async dispatch,
697                            // we don't want to
698                            // decrease the reference count until it gets put on the
699                            // wire.
700                            onDispatch(node, message);
701                        }
702                    });
703                    context.getConnection().dispatchAsync(md);
704                } else {
705                    context.getConnection().dispatchSync(md);
706                    onDispatch(node, message);
707                }
708                return true;
709            } else {
710                return false;
711            }
712        }
713    
714        protected void onDispatch(final MessageReference node, final Message message) {
715            if (node.getRegionDestination() != null) {
716                if (node != QueueMessageReference.NULL_MESSAGE) {
717                    node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
718                    node.getRegionDestination().getDestinationStatistics().getInflight().increment();
719                    if (LOG.isTraceEnabled()) {
720                        LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - "
721                                + message.getDestination()  + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
722                    }
723                }
724            }
725    
726            if (info.isDispatchAsync()) {
727                try {
728                    dispatchPending();
729                } catch (IOException e) {
730                    context.getConnection().serviceExceptionAsync(e);
731                }
732            }
733        }
734    
735        /**
736         * inform the MessageConsumer on the client to change it's prefetch
737         *
738         * @param newPrefetch
739         */
740        public void updateConsumerPrefetch(int newPrefetch) {
741            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
742                ConsumerControl cc = new ConsumerControl();
743                cc.setConsumerId(info.getConsumerId());
744                cc.setPrefetch(newPrefetch);
745                context.getConnection().dispatchAsync(cc);
746            }
747        }
748    
749        /**
750         * @param node
751         * @param message
752         * @return MessageDispatch
753         */
754        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
755            MessageDispatch md = new MessageDispatch();
756            md.setConsumerId(info.getConsumerId());
757    
758            if (node == QueueMessageReference.NULL_MESSAGE) {
759                md.setMessage(null);
760                md.setDestination(null);
761            } else {
762                md.setDestination(node.getRegionDestination().getActiveMQDestination());
763                md.setMessage(message);
764                md.setRedeliveryCounter(node.getRedeliveryCounter());
765            }
766    
767            return md;
768        }
769    
770        /**
771         * Use when a matched message is about to be dispatched to the client.
772         *
773         * @param node
774         * @return false if the message should not be dispatched to the client
775         *         (another sub may have already dispatched it for example).
776         * @throws IOException
777         */
778        protected abstract boolean canDispatch(MessageReference node) throws IOException;
779    
780        protected abstract boolean isDropped(MessageReference node);
781    
782        /**
783         * Used during acknowledgment to remove the message.
784         *
785         * @throws IOException
786         */
787        protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
788    
789    
790        public int getMaxProducersToAudit() {
791            return maxProducersToAudit;
792        }
793    
794        public void setMaxProducersToAudit(int maxProducersToAudit) {
795            this.maxProducersToAudit = maxProducersToAudit;
796        }
797    
798        public int getMaxAuditDepth() {
799            return maxAuditDepth;
800        }
801    
802        public void setMaxAuditDepth(int maxAuditDepth) {
803            this.maxAuditDepth = maxAuditDepth;
804        }
805    
806        public boolean isUsePrefetchExtension() {
807            return usePrefetchExtension;
808        }
809    
810        public void setUsePrefetchExtension(boolean usePrefetchExtension) {
811            this.usePrefetchExtension = usePrefetchExtension;
812        }
813    
814        protected int getPrefetchExtension() {
815            return this.prefetchExtension.get();
816        }
817    
818        @Override
819        public void setPrefetchSize(int prefetchSize) {
820            this.info.setPrefetchSize(prefetchSize);
821            try {
822                this.dispatchPending();
823            } catch (Exception e) {
824                LOG.trace("Caught exception during dispatch after prefetch change.", e);
825            }
826        }
827    }