001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.concurrent.CountDownLatch;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicInteger;
026    
027    import javax.jms.JMSException;
028    
029    import org.apache.activemq.broker.Broker;
030    import org.apache.activemq.broker.ConnectionContext;
031    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
032    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
033    import org.apache.activemq.command.ActiveMQMessage;
034    import org.apache.activemq.command.ConsumerControl;
035    import org.apache.activemq.command.ConsumerInfo;
036    import org.apache.activemq.command.Message;
037    import org.apache.activemq.command.MessageAck;
038    import org.apache.activemq.command.MessageDispatch;
039    import org.apache.activemq.command.MessageDispatchNotification;
040    import org.apache.activemq.command.MessageId;
041    import org.apache.activemq.command.MessagePull;
042    import org.apache.activemq.command.Response;
043    import org.apache.activemq.thread.Scheduler;
044    import org.apache.activemq.transaction.Synchronization;
045    import org.apache.activemq.transport.TransmitCallback;
046    import org.apache.activemq.usage.SystemUsage;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /**
051     * A subscription that honors the pre-fetch option of the ConsumerInfo.
052     */
053    public abstract class PrefetchSubscription extends AbstractSubscription {
054    
055        private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
056        protected final Scheduler scheduler;
057    
058        protected PendingMessageCursor pending;
059        protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
060        protected final AtomicInteger prefetchExtension = new AtomicInteger();
061        protected boolean usePrefetchExtension = true;
062        protected long enqueueCounter;
063        protected long dispatchCounter;
064        protected long dequeueCounter;
065        private int maxProducersToAudit=32;
066        private int maxAuditDepth=2048;
067        protected final SystemUsage usageManager;
068        protected final Object pendingLock = new Object();
069        protected final Object dispatchLock = new Object();
070        private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
071    
072        public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException {
073            super(broker,context, info);
074            this.usageManager=usageManager;
075            pending = cursor;
076            try {
077                pending.start();
078            } catch (Exception e) {
079                throw new JMSException(e.getMessage());
080            }
081            this.scheduler = broker.getScheduler();
082        }
083    
084        public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
085            this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
086        }
087    
088        /**
089         * Allows a message to be pulled on demand by a client
090         */
091        @Override
092        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
093            // The slave should not deliver pull messages.
094            // TODO: when the slave becomes a master, He should send a NULL message to all the
095            // consumers to 'wake them up' in case they were waiting for a message.
096            if (getPrefetchSize() == 0) {
097    
098                prefetchExtension.incrementAndGet();
099                final long dispatchCounterBeforePull = dispatchCounter;
100    
101                // Have the destination push us some messages.
102                for (Destination dest : destinations) {
103                    dest.iterate();
104                }
105                dispatchPending();
106    
107                synchronized(this) {
108                    // If there was nothing dispatched.. we may need to setup a timeout.
109                    if (dispatchCounterBeforePull == dispatchCounter) {
110                        // immediate timeout used by receiveNoWait()
111                        if (pull.getTimeout() == -1) {
112                            // Send a NULL message.
113                            add(QueueMessageReference.NULL_MESSAGE);
114                            dispatchPending();
115                        }
116                        if (pull.getTimeout() > 0) {
117                            scheduler.executeAfterDelay(new Runnable() {
118                                @Override
119                                public void run() {
120                                    pullTimeout(dispatchCounterBeforePull);
121                                }
122                            }, pull.getTimeout());
123                        }
124                    }
125                }
126            }
127            return null;
128        }
129    
130        /**
131         * Occurs when a pull times out. If nothing has been dispatched since the
132         * timeout was setup, then send the NULL message.
133         */
134        final void pullTimeout(long dispatchCounterBeforePull) {
135            synchronized (pendingLock) {
136                if (dispatchCounterBeforePull == dispatchCounter) {
137                    try {
138                        add(QueueMessageReference.NULL_MESSAGE);
139                        dispatchPending();
140                    } catch (Exception e) {
141                        context.getConnection().serviceException(e);
142                    }
143                }
144            }
145        }
146    
147        @Override
148        public void add(MessageReference node) throws Exception {
149            synchronized (pendingLock) {
150                // The destination may have just been removed...
151                if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
152                    // perhaps we should inform the caller that we are no longer valid to dispatch to?
153                    return;
154                }
155    
156                // Don't increment for the pullTimeout control message.
157                if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
158                    enqueueCounter++;
159                }
160                pending.addMessageLast(node);
161            }
162            dispatchPending();
163        }
164    
165        @Override
166        public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
167            synchronized(pendingLock) {
168                try {
169                    pending.reset();
170                    while (pending.hasNext()) {
171                        MessageReference node = pending.next();
172                        node.decrementReferenceCount();
173                        if (node.getMessageId().equals(mdn.getMessageId())) {
174                            // Synchronize between dispatched list and removal of messages from pending list
175                            // related to remove subscription action
176                            synchronized(dispatchLock) {
177                                pending.remove();
178                                createMessageDispatch(node, node.getMessage());
179                                dispatched.add(node);
180                                onDispatch(node, node.getMessage());
181                            }
182                            return;
183                        }
184                    }
185                } finally {
186                    pending.release();
187                }
188            }
189            throw new JMSException(
190                    "Slave broker out of sync with master: Dispatched message ("
191                            + mdn.getMessageId() + ") was not in the pending list for "
192                            + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
193        }
194    
195        @Override
196        public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
197            // Handle the standard acknowledgment case.
198            boolean callDispatchMatched = false;
199            Destination destination = null;
200    
201            if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
202                // suppress unexpected ack exception in this expected case
203                LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
204                return;
205            }
206    
207            if (LOG.isTraceEnabled()) {
208                LOG.trace("ack:" + ack);
209            }
210            synchronized(dispatchLock) {
211                if (ack.isStandardAck()) {
212                    // First check if the ack matches the dispatched. When using failover this might
213                    // not be the case. We don't ever want to ack the wrong messages.
214                    assertAckMatchesDispatched(ack);
215    
216                    // Acknowledge all dispatched messages up till the message id of
217                    // the acknowledgment.
218                    int index = 0;
219                    boolean inAckRange = false;
220                    List<MessageReference> removeList = new ArrayList<MessageReference>();
221                    for (final MessageReference node : dispatched) {
222                        MessageId messageId = node.getMessageId();
223                        if (ack.getFirstMessageId() == null
224                                || ack.getFirstMessageId().equals(messageId)) {
225                            inAckRange = true;
226                        }
227                        if (inAckRange) {
228                            // Don't remove the nodes until we are committed.
229                            if (!context.isInTransaction()) {
230                                dequeueCounter++;
231                                ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
232                                removeList.add(node);
233                            } else {
234                                registerRemoveSync(context, node);
235                            }
236                            index++;
237                            acknowledge(context, ack, node);
238                            if (ack.getLastMessageId().equals(messageId)) {
239                                // contract prefetch if dispatch required a pull
240                                if (getPrefetchSize() == 0) {
241                                    // Protect extension update against parallel updates.
242                                    while (true) {
243                                        int currentExtension = prefetchExtension.get();
244                                        int newExtension = Math.max(0, currentExtension - index);
245                                        if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
246                                            break;
247                                        }
248                                    }
249                                } else if (usePrefetchExtension && context.isInTransaction()) {
250                                    // extend prefetch window only if not a pulling consumer
251                                    while (true) {
252                                        int currentExtension = prefetchExtension.get();
253                                        int newExtension = Math.max(currentExtension, index);
254                                        if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
255                                            break;
256                                        }
257                                    }
258                                }
259                                destination = (Destination) node.getRegionDestination();
260                                callDispatchMatched = true;
261                                break;
262                            }
263                        }
264                    }
265                    for (final MessageReference node : removeList) {
266                        dispatched.remove(node);
267                    }
268                    // this only happens after a reconnect - get an ack which is not
269                    // valid
270                    if (!callDispatchMatched) {
271                        LOG.warn("Could not correlate acknowledgment with dispatched message: "
272                                      + ack);
273                    }
274                } else if (ack.isIndividualAck()) {
275                    // Message was delivered and acknowledge - but only delete the
276                    // individual message
277                    for (final MessageReference node : dispatched) {
278                        MessageId messageId = node.getMessageId();
279                        if (ack.getLastMessageId().equals(messageId)) {
280                            // Don't remove the nodes until we are committed - immediateAck option
281                            if (!context.isInTransaction()) {
282                                dequeueCounter++;
283                                ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
284                                dispatched.remove(node);
285                            } else {
286                                registerRemoveSync(context, node);
287                            }
288    
289                            // Protect extension update against parallel updates.
290                            while (true) {
291                                int currentExtension = prefetchExtension.get();
292                                int newExtension = Math.max(0, currentExtension - 1);
293                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
294                                    break;
295                                }
296                            }
297                            acknowledge(context, ack, node);
298                            destination = (Destination) node.getRegionDestination();
299                            callDispatchMatched = true;
300                            break;
301                        }
302                    }
303                }else if (ack.isDeliveredAck()) {
304                    // Message was delivered but not acknowledged: update pre-fetch
305                    // counters.
306                    int index = 0;
307                    for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
308                        final MessageReference node = iter.next();
309                        Destination nodeDest = (Destination) node.getRegionDestination();
310                        if (node.isExpired()) {
311                            if (broker.isExpired(node)) {
312                                Destination regionDestination = nodeDest;
313                                regionDestination.messageExpired(context, this, node);
314                            }
315                            iter.remove();
316                            nodeDest.getDestinationStatistics().getInflight().decrement();
317                        }
318                        if (ack.getLastMessageId().equals(node.getMessageId())) {
319                            if (usePrefetchExtension) {
320                                while (true) {
321                                    int currentExtension = prefetchExtension.get();
322                                    int newExtension = Math.max(currentExtension, index + 1);
323                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
324                                        break;
325                                    }
326                                }
327                            }
328                            destination = nodeDest;
329                            callDispatchMatched = true;
330                            break;
331                        }
332                    }
333                    if (!callDispatchMatched) {
334                        throw new JMSException(
335                                "Could not correlate acknowledgment with dispatched message: "
336                                        + ack);
337                    }
338                } else if (ack.isRedeliveredAck()) {
339                    // Message was re-delivered but it was not yet considered to be
340                    // a DLQ message.
341                    boolean inAckRange = false;
342                    for (final MessageReference node : dispatched) {
343                        MessageId messageId = node.getMessageId();
344                        if (ack.getFirstMessageId() == null
345                                || ack.getFirstMessageId().equals(messageId)) {
346                            inAckRange = true;
347                        }
348                        if (inAckRange) {
349                            if (ack.getLastMessageId().equals(messageId)) {
350                                destination = (Destination) node.getRegionDestination();
351                                callDispatchMatched = true;
352                                break;
353                            }
354                        }
355                    }
356                    if (!callDispatchMatched) {
357                        throw new JMSException(
358                                "Could not correlate acknowledgment with dispatched message: "
359                                        + ack);
360                    }
361                } else if (ack.isPoisonAck()) {
362                    // TODO: what if the message is already in a DLQ???
363                    // Handle the poison ACK case: we need to send the message to a
364                    // DLQ
365                    if (ack.isInTransaction()) {
366                        throw new JMSException("Poison ack cannot be transacted: "
367                                + ack);
368                    }
369                    int index = 0;
370                    boolean inAckRange = false;
371                    List<MessageReference> removeList = new ArrayList<MessageReference>();
372                    for (final MessageReference node : dispatched) {
373                        MessageId messageId = node.getMessageId();
374                        if (ack.getFirstMessageId() == null
375                                || ack.getFirstMessageId().equals(messageId)) {
376                            inAckRange = true;
377                        }
378                        if (inAckRange) {
379                            if (ack.getPoisonCause() != null) {
380                                node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
381                                        ack.getPoisonCause().toString());
382                            }
383                            sendToDLQ(context, node);
384                            Destination nodeDest = (Destination) node.getRegionDestination();
385                            nodeDest.getDestinationStatistics()
386                                    .getInflight().decrement();
387                            removeList.add(node);
388                            dequeueCounter++;
389                            index++;
390                            acknowledge(context, ack, node);
391                            if (ack.getLastMessageId().equals(messageId)) {
392                                while (true) {
393                                    int currentExtension = prefetchExtension.get();
394                                    int newExtension = Math.max(0, currentExtension - (index + 1));
395                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
396                                        break;
397                                    }
398                                }
399                                destination = nodeDest;
400                                callDispatchMatched = true;
401                                break;
402                            }
403                        }
404                    }
405                    for (final MessageReference node : removeList) {
406                        dispatched.remove(node);
407                    }
408                    if (!callDispatchMatched) {
409                        throw new JMSException(
410                                "Could not correlate acknowledgment with dispatched message: "
411                                        + ack);
412                    }
413                }
414            }
415            if (callDispatchMatched && destination != null) {
416                destination.wakeup();
417                dispatchPending();
418            } else {
419                LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
420                        + ack);
421            }
422        }
423    
424        private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
425            // setup a Synchronization to remove nodes from the
426            // dispatched list.
427            context.getTransaction().addSynchronization(
428                    new Synchronization() {
429    
430                        @Override
431                        public void afterCommit()
432                                throws Exception {
433                            Destination nodeDest = (Destination) node.getRegionDestination();
434                            synchronized(dispatchLock) {
435                                dequeueCounter++;
436                                dispatched.remove(node);
437                                nodeDest.getDestinationStatistics().getInflight().decrement();
438                            }
439                            nodeDest.wakeup();
440                            dispatchPending();
441                        }
442    
443                        @Override
444                        public void afterRollback() throws Exception {
445                            synchronized(dispatchLock) {
446                                // poisionAck will decrement - otherwise still inflight on client
447                            }
448                        }
449                    });
450        }
451    
452        /**
453         * Checks an ack versus the contents of the dispatched list.
454         *  called with dispatchLock held
455         * @param ack
456         * @throws JMSException if it does not match
457         */
458        protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
459            MessageId firstAckedMsg = ack.getFirstMessageId();
460            MessageId lastAckedMsg = ack.getLastMessageId();
461            int checkCount = 0;
462            boolean checkFoundStart = false;
463            boolean checkFoundEnd = false;
464            for (MessageReference node : dispatched) {
465    
466                if (firstAckedMsg == null) {
467                    checkFoundStart = true;
468                } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
469                    checkFoundStart = true;
470                }
471    
472                if (checkFoundStart) {
473                    checkCount++;
474                }
475    
476                if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
477                    checkFoundEnd = true;
478                    break;
479                }
480            }
481            if (!checkFoundStart && firstAckedMsg != null)
482                throw new JMSException("Unmatched acknowledge: " + ack
483                        + "; Could not find Message-ID " + firstAckedMsg
484                        + " in dispatched-list (start of ack)");
485            if (!checkFoundEnd && lastAckedMsg != null)
486                throw new JMSException("Unmatched acknowledge: " + ack
487                        + "; Could not find Message-ID " + lastAckedMsg
488                        + " in dispatched-list (end of ack)");
489            if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
490                throw new JMSException("Unmatched acknowledge: " + ack
491                        + "; Expected message count (" + ack.getMessageCount()
492                        + ") differs from count in dispatched-list (" + checkCount
493                        + ")");
494            }
495        }
496    
497        /**
498         * @param context
499         * @param node
500         * @throws IOException
501         * @throws Exception
502         */
503        protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
504            broker.getRoot().sendToDeadLetterQueue(context, node, this);
505        }
506    
507        @Override
508        public int getInFlightSize() {
509            return dispatched.size();
510        }
511    
512        /**
513         * Used to determine if the broker can dispatch to the consumer.
514         *
515         * @return
516         */
517        @Override
518        public boolean isFull() {
519            return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
520        }
521    
522        /**
523         * @return true when 60% or more room is left for dispatching messages
524         */
525        @Override
526        public boolean isLowWaterMark() {
527            return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
528        }
529    
530        /**
531         * @return true when 10% or less room is left for dispatching messages
532         */
533        @Override
534        public boolean isHighWaterMark() {
535            return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
536        }
537    
538        @Override
539        public int countBeforeFull() {
540            return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
541        }
542    
543        @Override
544        public int getPendingQueueSize() {
545            return pending.size();
546        }
547    
548        @Override
549        public int getDispatchedQueueSize() {
550            return dispatched.size();
551        }
552    
553        @Override
554        public long getDequeueCounter() {
555            return dequeueCounter;
556        }
557    
558        @Override
559        public long getDispatchedCounter() {
560            return dispatchCounter;
561        }
562    
563        @Override
564        public long getEnqueueCounter() {
565            return enqueueCounter;
566        }
567    
568        @Override
569        public boolean isRecoveryRequired() {
570            return pending.isRecoveryRequired();
571        }
572    
573        public PendingMessageCursor getPending() {
574            return this.pending;
575        }
576    
577        public void setPending(PendingMessageCursor pending) {
578            this.pending = pending;
579            if (this.pending!=null) {
580                this.pending.setSystemUsage(usageManager);
581                this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
582            }
583        }
584    
585       @Override
586        public void add(ConnectionContext context, Destination destination) throws Exception {
587            synchronized(pendingLock) {
588                super.add(context, destination);
589                pending.add(context, destination);
590            }
591        }
592    
593        @Override
594        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
595            List<MessageReference> rc = new ArrayList<MessageReference>();
596            synchronized(pendingLock) {
597                super.remove(context, destination);
598                // Here is a potential problem concerning Inflight stat:
599                // Messages not already committed or rolled back may not be removed from dispatched list at the moment
600                // Except if each commit or rollback callback action comes before remove of subscriber.
601                rc.addAll(pending.remove(context, destination));
602    
603                // Synchronized to DispatchLock
604                synchronized(dispatchLock) {
605                    ArrayList<MessageReference> references = new ArrayList<MessageReference>();
606                    for (MessageReference r : dispatched) {
607                        if( r.getRegionDestination() == destination) {
608                            references.add(r);
609                        }
610                    }
611                    rc.addAll(references);
612                    destination.getDestinationStatistics().getDispatched().subtract(references.size());
613                    destination.getDestinationStatistics().getInflight().subtract(references.size());
614                    dispatched.removeAll(references);
615                }
616            }
617            return rc;
618        }
619    
620        protected void dispatchPending() throws IOException {
621           synchronized(pendingLock) {
622                try {
623                    int numberToDispatch = countBeforeFull();
624                    if (numberToDispatch > 0) {
625                        setSlowConsumer(false);
626                        setPendingBatchSize(pending, numberToDispatch);
627                        int count = 0;
628                        pending.reset();
629                        while (pending.hasNext() && !isFull() && count < numberToDispatch) {
630                            MessageReference node = pending.next();
631                            if (node == null) {
632                                break;
633                            }
634    
635                            // Synchronize between dispatched list and remove of message from pending list
636                            // related to remove subscription action
637                            synchronized(dispatchLock) {
638                                pending.remove();
639                                node.decrementReferenceCount();
640                                if( !isDropped(node) && canDispatch(node)) {
641    
642                                    // Message may have been sitting in the pending
643                                    // list a while waiting for the consumer to ak the message.
644                                    if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
645                                        //increment number to dispatch
646                                        numberToDispatch++;
647                                        if (broker.isExpired(node)) {
648                                            ((Destination)node.getRegionDestination()).messageExpired(context, this, node);
649                                        }
650                                        continue;
651                                    }
652                                    dispatch(node);
653                                    count++;
654                                }
655                            }
656                        }
657                    } else if (!isSlowConsumer()) {
658                        setSlowConsumer(true);
659                        for (Destination dest :destinations) {
660                            dest.slowConsumer(context, this);
661                        }
662                    }
663                } finally {
664                    pending.release();
665                }
666            }
667        }
668    
669        protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
670            pending.setMaxBatchSize(numberToDispatch);
671        }
672    
673        // called with dispatchLock held
674        protected boolean dispatch(final MessageReference node) throws IOException {
675            final Message message = node.getMessage();
676            if (message == null) {
677                return false;
678            }
679    
680            okForAckAsDispatchDone.countDown();
681    
682            // No reentrant lock - Patch needed to IndirectMessageReference on method lock
683            MessageDispatch md = createMessageDispatch(node, message);
684            // NULL messages don't count... they don't get Acked.
685            if (node != QueueMessageReference.NULL_MESSAGE) {
686                dispatchCounter++;
687                dispatched.add(node);
688            } else {
689                while (true) {
690                    int currentExtension = prefetchExtension.get();
691                    int newExtension = Math.max(0, currentExtension - 1);
692                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
693                        break;
694                    }
695                }
696            }
697            if (info.isDispatchAsync()) {
698                md.setTransmitCallback(new TransmitCallback() {
699    
700                    @Override
701                    public void onSuccess() {
702                        // Since the message gets queued up in async dispatch, we don't want to
703                        // decrease the reference count until it gets put on the wire.
704                        onDispatch(node, message);
705                    }
706    
707                    @Override
708                    public void onFailure() {
709                        Destination nodeDest = (Destination) node.getRegionDestination();
710                        if (nodeDest != null) {
711                            if (node != QueueMessageReference.NULL_MESSAGE) {
712                                nodeDest.getDestinationStatistics().getDispatched().increment();
713                                nodeDest.getDestinationStatistics().getInflight().increment();
714                                if (LOG.isTraceEnabled()) {
715                                    LOG.trace(info.getConsumerId() + " failed to dispatch: " + message.getMessageId() + " - "
716                                            + message.getDestination()  + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
717                                }
718                            }
719                        }
720                    }
721                });
722                context.getConnection().dispatchAsync(md);
723            } else {
724                context.getConnection().dispatchSync(md);
725                onDispatch(node, message);
726            }
727            return true;
728        }
729    
730        protected void onDispatch(final MessageReference node, final Message message) {
731            Destination nodeDest = (Destination) node.getRegionDestination();
732            if (nodeDest != null) {
733                if (node != QueueMessageReference.NULL_MESSAGE) {
734                    nodeDest.getDestinationStatistics().getDispatched().increment();
735                    nodeDest.getDestinationStatistics().getInflight().increment();
736                    if (LOG.isTraceEnabled()) {
737                        LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - "
738                                + message.getDestination()  + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
739                    }
740                }
741            }
742    
743            if (info.isDispatchAsync()) {
744                try {
745                    dispatchPending();
746                } catch (IOException e) {
747                    context.getConnection().serviceExceptionAsync(e);
748                }
749            }
750        }
751    
752        /**
753         * inform the MessageConsumer on the client to change it's prefetch
754         *
755         * @param newPrefetch
756         */
757        @Override
758        public void updateConsumerPrefetch(int newPrefetch) {
759            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
760                ConsumerControl cc = new ConsumerControl();
761                cc.setConsumerId(info.getConsumerId());
762                cc.setPrefetch(newPrefetch);
763                context.getConnection().dispatchAsync(cc);
764            }
765        }
766    
767        /**
768         * @param node
769         * @param message
770         * @return MessageDispatch
771         */
772        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
773            MessageDispatch md = new MessageDispatch();
774            md.setConsumerId(info.getConsumerId());
775    
776            if (node == QueueMessageReference.NULL_MESSAGE) {
777                md.setMessage(null);
778                md.setDestination(null);
779            } else {
780                Destination regionDestination = (Destination) node.getRegionDestination();
781                md.setDestination(regionDestination.getActiveMQDestination());
782                md.setMessage(message);
783                md.setRedeliveryCounter(node.getRedeliveryCounter());
784            }
785    
786            return md;
787        }
788    
789        /**
790         * Use when a matched message is about to be dispatched to the client.
791         *
792         * @param node
793         * @return false if the message should not be dispatched to the client
794         *         (another sub may have already dispatched it for example).
795         * @throws IOException
796         */
797        protected abstract boolean canDispatch(MessageReference node) throws IOException;
798    
799        protected abstract boolean isDropped(MessageReference node);
800    
801        /**
802         * Used during acknowledgment to remove the message.
803         *
804         * @throws IOException
805         */
806        protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
807    
808    
809        public int getMaxProducersToAudit() {
810            return maxProducersToAudit;
811        }
812    
813        public void setMaxProducersToAudit(int maxProducersToAudit) {
814            this.maxProducersToAudit = maxProducersToAudit;
815        }
816    
817        public int getMaxAuditDepth() {
818            return maxAuditDepth;
819        }
820    
821        public void setMaxAuditDepth(int maxAuditDepth) {
822            this.maxAuditDepth = maxAuditDepth;
823        }
824    
825        public boolean isUsePrefetchExtension() {
826            return usePrefetchExtension;
827        }
828    
829        public void setUsePrefetchExtension(boolean usePrefetchExtension) {
830            this.usePrefetchExtension = usePrefetchExtension;
831        }
832    
833        protected int getPrefetchExtension() {
834            return this.prefetchExtension.get();
835        }
836    
837        @Override
838        public void setPrefetchSize(int prefetchSize) {
839            this.info.setPrefetchSize(prefetchSize);
840            try {
841                this.dispatchPending();
842            } catch (Exception e) {
843                LOG.trace("Caught exception during dispatch after prefetch change.", e);
844            }
845        }
846    }