001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.concurrent.CountDownLatch;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicInteger;
026    
027    import javax.jms.JMSException;
028    
029    import org.apache.activemq.broker.Broker;
030    import org.apache.activemq.broker.ConnectionContext;
031    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
032    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
033    import org.apache.activemq.command.ConsumerControl;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.Message;
036    import org.apache.activemq.command.MessageAck;
037    import org.apache.activemq.command.MessageDispatch;
038    import org.apache.activemq.command.MessageDispatchNotification;
039    import org.apache.activemq.command.MessageId;
040    import org.apache.activemq.command.MessagePull;
041    import org.apache.activemq.command.Response;
042    import org.apache.activemq.thread.Scheduler;
043    import org.apache.activemq.transaction.Synchronization;
044    import org.apache.activemq.transport.TransmitCallback;
045    import org.apache.activemq.usage.SystemUsage;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * A subscription that honors the pre-fetch option of the ConsumerInfo.
051     */
052    public abstract class PrefetchSubscription extends AbstractSubscription {
053    
054        private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
055        protected final Scheduler scheduler;
056    
057        protected PendingMessageCursor pending;
058        protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
059        protected final AtomicInteger prefetchExtension = new AtomicInteger();
060        protected boolean usePrefetchExtension = true;
061        protected long enqueueCounter;
062        protected long dispatchCounter;
063        protected long dequeueCounter;
064        private int maxProducersToAudit=32;
065        private int maxAuditDepth=2048;
066        protected final SystemUsage usageManager;
067        protected final Object pendingLock = new Object();
068        protected final Object dispatchLock = new Object();
069        private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
070    
071        public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException {
072            super(broker,context, info);
073            this.usageManager=usageManager;
074            pending = cursor;
075            try {
076                pending.start();
077            } catch (Exception e) {
078                throw new JMSException(e.getMessage());
079            }
080            this.scheduler = broker.getScheduler();
081        }
082    
083        public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
084            this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
085        }
086    
087        /**
088         * Allows a message to be pulled on demand by a client
089         */
090        @Override
091        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
092            // The slave should not deliver pull messages.
093            // TODO: when the slave becomes a master, He should send a NULL message to all the
094            // consumers to 'wake them up' in case they were waiting for a message.
095            if (getPrefetchSize() == 0) {
096    
097                prefetchExtension.incrementAndGet();
098                final long dispatchCounterBeforePull = dispatchCounter;
099    
100                // Have the destination push us some messages.
101                for (Destination dest : destinations) {
102                    dest.iterate();
103                }
104                dispatchPending();
105    
106                synchronized(this) {
107                    // If there was nothing dispatched.. we may need to setup a timeout.
108                    if (dispatchCounterBeforePull == dispatchCounter) {
109                        // immediate timeout used by receiveNoWait()
110                        if (pull.getTimeout() == -1) {
111                            // Send a NULL message.
112                            add(QueueMessageReference.NULL_MESSAGE);
113                            dispatchPending();
114                        }
115                        if (pull.getTimeout() > 0) {
116                            scheduler.executeAfterDelay(new Runnable() {
117                                @Override
118                                public void run() {
119                                    pullTimeout(dispatchCounterBeforePull);
120                                }
121                            }, pull.getTimeout());
122                        }
123                    }
124                }
125            }
126            return null;
127        }
128    
129        /**
130         * Occurs when a pull times out. If nothing has been dispatched since the
131         * timeout was setup, then send the NULL message.
132         */
133        final void pullTimeout(long dispatchCounterBeforePull) {
134            synchronized (pendingLock) {
135                if (dispatchCounterBeforePull == dispatchCounter) {
136                    try {
137                        add(QueueMessageReference.NULL_MESSAGE);
138                        dispatchPending();
139                    } catch (Exception e) {
140                        context.getConnection().serviceException(e);
141                    }
142                }
143            }
144        }
145    
146        @Override
147        public void add(MessageReference node) throws Exception {
148            synchronized (pendingLock) {
149                // The destination may have just been removed...
150                if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
151                    // perhaps we should inform the caller that we are no longer valid to dispatch to?
152                    return;
153                }
154    
155                // Don't increment for the pullTimeout control message.
156                if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
157                    enqueueCounter++;
158                }
159                pending.addMessageLast(node);
160            }
161            dispatchPending();
162        }
163    
164        @Override
165        public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
166            synchronized(pendingLock) {
167                try {
168                    pending.reset();
169                    while (pending.hasNext()) {
170                        MessageReference node = pending.next();
171                        node.decrementReferenceCount();
172                        if (node.getMessageId().equals(mdn.getMessageId())) {
173                            // Synchronize between dispatched list and removal of messages from pending list
174                            // related to remove subscription action
175                            synchronized(dispatchLock) {
176                                pending.remove();
177                                createMessageDispatch(node, node.getMessage());
178                                dispatched.add(node);
179                                onDispatch(node, node.getMessage());
180                            }
181                            return;
182                        }
183                    }
184                } finally {
185                    pending.release();
186                }
187            }
188            throw new JMSException(
189                    "Slave broker out of sync with master: Dispatched message ("
190                            + mdn.getMessageId() + ") was not in the pending list for "
191                            + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
192        }
193    
194        @Override
195        public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
196            // Handle the standard acknowledgment case.
197            boolean callDispatchMatched = false;
198            Destination destination = null;
199    
200            if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
201                // suppress unexpected ack exception in this expected case
202                LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack);
203                return;
204            }
205    
206            LOG.trace("ack: {}", ack);
207    
208            synchronized(dispatchLock) {
209                if (ack.isStandardAck()) {
210                    // First check if the ack matches the dispatched. When using failover this might
211                    // not be the case. We don't ever want to ack the wrong messages.
212                    assertAckMatchesDispatched(ack);
213    
214                    // Acknowledge all dispatched messages up till the message id of
215                    // the acknowledgment.
216                    int index = 0;
217                    boolean inAckRange = false;
218                    List<MessageReference> removeList = new ArrayList<MessageReference>();
219                    for (final MessageReference node : dispatched) {
220                        MessageId messageId = node.getMessageId();
221                        if (ack.getFirstMessageId() == null
222                                || ack.getFirstMessageId().equals(messageId)) {
223                            inAckRange = true;
224                        }
225                        if (inAckRange) {
226                            // Don't remove the nodes until we are committed.
227                            if (!context.isInTransaction()) {
228                                dequeueCounter++;
229                                ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
230                                removeList.add(node);
231                            } else {
232                                registerRemoveSync(context, node);
233                            }
234                            index++;
235                            acknowledge(context, ack, node);
236                            if (ack.getLastMessageId().equals(messageId)) {
237                                // contract prefetch if dispatch required a pull
238                                if (getPrefetchSize() == 0) {
239                                    // Protect extension update against parallel updates.
240                                    while (true) {
241                                        int currentExtension = prefetchExtension.get();
242                                        int newExtension = Math.max(0, currentExtension - index);
243                                        if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
244                                            break;
245                                        }
246                                    }
247                                } else if (usePrefetchExtension && context.isInTransaction()) {
248                                    // extend prefetch window only if not a pulling consumer
249                                    while (true) {
250                                        int currentExtension = prefetchExtension.get();
251                                        int newExtension = Math.max(currentExtension, index);
252                                        if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
253                                            break;
254                                        }
255                                    }
256                                }
257                                destination = (Destination) node.getRegionDestination();
258                                callDispatchMatched = true;
259                                break;
260                            }
261                        }
262                    }
263                    for (final MessageReference node : removeList) {
264                        dispatched.remove(node);
265                    }
266                    // this only happens after a reconnect - get an ack which is not
267                    // valid
268                    if (!callDispatchMatched) {
269                        LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack);
270                    }
271                } else if (ack.isIndividualAck()) {
272                    // Message was delivered and acknowledge - but only delete the
273                    // individual message
274                    for (final MessageReference node : dispatched) {
275                        MessageId messageId = node.getMessageId();
276                        if (ack.getLastMessageId().equals(messageId)) {
277                            // Don't remove the nodes until we are committed - immediateAck option
278                            if (!context.isInTransaction()) {
279                                dequeueCounter++;
280                                ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
281                                dispatched.remove(node);
282                            } else {
283                                registerRemoveSync(context, node);
284                            }
285    
286                            // Protect extension update against parallel updates.
287                            while (true) {
288                                int currentExtension = prefetchExtension.get();
289                                int newExtension = Math.max(0, currentExtension - 1);
290                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
291                                    break;
292                                }
293                            }
294                            acknowledge(context, ack, node);
295                            destination = (Destination) node.getRegionDestination();
296                            callDispatchMatched = true;
297                            break;
298                        }
299                    }
300                }else if (ack.isDeliveredAck()) {
301                    // Message was delivered but not acknowledged: update pre-fetch
302                    // counters.
303                    int index = 0;
304                    for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
305                        final MessageReference node = iter.next();
306                        Destination nodeDest = (Destination) node.getRegionDestination();
307                        if (node.isExpired()) {
308                            if (broker.isExpired(node)) {
309                                Destination regionDestination = nodeDest;
310                                regionDestination.messageExpired(context, this, node);
311                            }
312                            iter.remove();
313                            nodeDest.getDestinationStatistics().getInflight().decrement();
314                        }
315                        if (ack.getLastMessageId().equals(node.getMessageId())) {
316                            if (usePrefetchExtension) {
317                                while (true) {
318                                    int currentExtension = prefetchExtension.get();
319                                    int newExtension = Math.max(currentExtension, index + 1);
320                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
321                                        break;
322                                    }
323                                }
324                            }
325                            destination = nodeDest;
326                            callDispatchMatched = true;
327                            break;
328                        }
329                    }
330                    if (!callDispatchMatched) {
331                        throw new JMSException(
332                                "Could not correlate acknowledgment with dispatched message: "
333                                        + ack);
334                    }
335                } else if (ack.isRedeliveredAck()) {
336                    // Message was re-delivered but it was not yet considered to be
337                    // a DLQ message.
338                    boolean inAckRange = false;
339                    for (final MessageReference node : dispatched) {
340                        MessageId messageId = node.getMessageId();
341                        if (ack.getFirstMessageId() == null
342                                || ack.getFirstMessageId().equals(messageId)) {
343                            inAckRange = true;
344                        }
345                        if (inAckRange) {
346                            if (ack.getLastMessageId().equals(messageId)) {
347                                destination = (Destination) node.getRegionDestination();
348                                callDispatchMatched = true;
349                                break;
350                            }
351                        }
352                    }
353                    if (!callDispatchMatched) {
354                        throw new JMSException(
355                                "Could not correlate acknowledgment with dispatched message: "
356                                        + ack);
357                    }
358                } else if (ack.isPoisonAck()) {
359                    // TODO: what if the message is already in a DLQ???
360                    // Handle the poison ACK case: we need to send the message to a
361                    // DLQ
362                    if (ack.isInTransaction()) {
363                        throw new JMSException("Poison ack cannot be transacted: "
364                                + ack);
365                    }
366                    int index = 0;
367                    boolean inAckRange = false;
368                    List<MessageReference> removeList = new ArrayList<MessageReference>();
369                    for (final MessageReference node : dispatched) {
370                        MessageId messageId = node.getMessageId();
371                        if (ack.getFirstMessageId() == null
372                                || ack.getFirstMessageId().equals(messageId)) {
373                            inAckRange = true;
374                        }
375                        if (inAckRange) {
376                            sendToDLQ(context, node, ack.getPoisonCause());
377                            Destination nodeDest = (Destination) node.getRegionDestination();
378                            nodeDest.getDestinationStatistics()
379                                    .getInflight().decrement();
380                            removeList.add(node);
381                            dequeueCounter++;
382                            index++;
383                            acknowledge(context, ack, node);
384                            if (ack.getLastMessageId().equals(messageId)) {
385                                while (true) {
386                                    int currentExtension = prefetchExtension.get();
387                                    int newExtension = Math.max(0, currentExtension - (index + 1));
388                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
389                                        break;
390                                    }
391                                }
392                                destination = nodeDest;
393                                callDispatchMatched = true;
394                                break;
395                            }
396                        }
397                    }
398                    for (final MessageReference node : removeList) {
399                        dispatched.remove(node);
400                    }
401                    if (!callDispatchMatched) {
402                        throw new JMSException(
403                                "Could not correlate acknowledgment with dispatched message: "
404                                        + ack);
405                    }
406                }
407            }
408            if (callDispatchMatched && destination != null) {
409                destination.wakeup();
410                dispatchPending();
411    
412                if (pending.isEmpty()) {
413                    for (Destination dest : destinations) {
414                        dest.wakeup();
415                    }
416                }
417            } else {
418                LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
419            }
420        }
421    
422        private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
423            // setup a Synchronization to remove nodes from the
424            // dispatched list.
425            context.getTransaction().addSynchronization(
426                    new Synchronization() {
427    
428                        @Override
429                        public void afterCommit()
430                                throws Exception {
431                            Destination nodeDest = (Destination) node.getRegionDestination();
432                            synchronized(dispatchLock) {
433                                dequeueCounter++;
434                                dispatched.remove(node);
435                                nodeDest.getDestinationStatistics().getInflight().decrement();
436                            }
437                            nodeDest.wakeup();
438                            dispatchPending();
439                        }
440    
441                        @Override
442                        public void afterRollback() throws Exception {
443                            synchronized(dispatchLock) {
444                                // poisionAck will decrement - otherwise still inflight on client
445                            }
446                        }
447                    });
448        }
449    
450        /**
451         * Checks an ack versus the contents of the dispatched list.
452         *  called with dispatchLock held
453         * @param ack
454         * @throws JMSException if it does not match
455         */
456        protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
457            MessageId firstAckedMsg = ack.getFirstMessageId();
458            MessageId lastAckedMsg = ack.getLastMessageId();
459            int checkCount = 0;
460            boolean checkFoundStart = false;
461            boolean checkFoundEnd = false;
462            for (MessageReference node : dispatched) {
463    
464                if (firstAckedMsg == null) {
465                    checkFoundStart = true;
466                } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
467                    checkFoundStart = true;
468                }
469    
470                if (checkFoundStart) {
471                    checkCount++;
472                }
473    
474                if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
475                    checkFoundEnd = true;
476                    break;
477                }
478            }
479            if (!checkFoundStart && firstAckedMsg != null)
480                throw new JMSException("Unmatched acknowledge: " + ack
481                        + "; Could not find Message-ID " + firstAckedMsg
482                        + " in dispatched-list (start of ack)");
483            if (!checkFoundEnd && lastAckedMsg != null)
484                throw new JMSException("Unmatched acknowledge: " + ack
485                        + "; Could not find Message-ID " + lastAckedMsg
486                        + " in dispatched-list (end of ack)");
487            if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
488                throw new JMSException("Unmatched acknowledge: " + ack
489                        + "; Expected message count (" + ack.getMessageCount()
490                        + ") differs from count in dispatched-list (" + checkCount
491                        + ")");
492            }
493        }
494    
495        /**
496         *
497         * @param context
498         * @param node
499         * @param poisonCause
500         * @throws IOException
501         * @throws Exception
502         */
503        protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception {
504            broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause);
505        }
506    
507        @Override
508        public int getInFlightSize() {
509            return dispatched.size();
510        }
511    
512        /**
513         * Used to determine if the broker can dispatch to the consumer.
514         *
515         * @return
516         */
517        @Override
518        public boolean isFull() {
519            return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
520        }
521    
522        /**
523         * @return true when 60% or more room is left for dispatching messages
524         */
525        @Override
526        public boolean isLowWaterMark() {
527            return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
528        }
529    
530        /**
531         * @return true when 10% or less room is left for dispatching messages
532         */
533        @Override
534        public boolean isHighWaterMark() {
535            return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
536        }
537    
538        @Override
539        public int countBeforeFull() {
540            return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
541        }
542    
543        @Override
544        public int getPendingQueueSize() {
545            return pending.size();
546        }
547    
548        @Override
549        public int getDispatchedQueueSize() {
550            return dispatched.size();
551        }
552    
553        @Override
554        public long getDequeueCounter() {
555            return dequeueCounter;
556        }
557    
558        @Override
559        public long getDispatchedCounter() {
560            return dispatchCounter;
561        }
562    
563        @Override
564        public long getEnqueueCounter() {
565            return enqueueCounter;
566        }
567    
568        @Override
569        public boolean isRecoveryRequired() {
570            return pending.isRecoveryRequired();
571        }
572    
573        public PendingMessageCursor getPending() {
574            return this.pending;
575        }
576    
577        public void setPending(PendingMessageCursor pending) {
578            this.pending = pending;
579            if (this.pending!=null) {
580                this.pending.setSystemUsage(usageManager);
581                this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
582            }
583        }
584    
585        @Override
586        public void add(ConnectionContext context, Destination destination) throws Exception {
587            synchronized(pendingLock) {
588                super.add(context, destination);
589                pending.add(context, destination);
590            }
591        }
592    
593        @Override
594        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
595            return remove(context, destination, dispatched);
596        }
597    
598        public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
599            List<MessageReference> rc = new ArrayList<MessageReference>();
600            synchronized(pendingLock) {
601                super.remove(context, destination);
602                // Here is a potential problem concerning Inflight stat:
603                // Messages not already committed or rolled back may not be removed from dispatched list at the moment
604                // Except if each commit or rollback callback action comes before remove of subscriber.
605                rc.addAll(pending.remove(context, destination));
606    
607                if (dispatched == null) {
608                    return rc;
609                }
610    
611                // Synchronized to DispatchLock if necessary
612                if (dispatched == this.dispatched) {
613                    synchronized(dispatchLock) {
614                        updateDestinationStats(rc, destination, dispatched);
615                    }
616                } else {
617                    updateDestinationStats(rc, destination, dispatched);
618                }
619            }
620            return rc;
621        }
622    
623        private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
624            ArrayList<MessageReference> references = new ArrayList<MessageReference>();
625            for (MessageReference r : dispatched) {
626                if (r.getRegionDestination() == destination) {
627                    references.add(r);
628                }
629            }
630            rc.addAll(references);
631            destination.getDestinationStatistics().getDispatched().subtract(references.size());
632            destination.getDestinationStatistics().getInflight().subtract(references.size());
633            dispatched.removeAll(references);
634        }
635    
636        protected void dispatchPending() throws IOException {
637           synchronized(pendingLock) {
638                try {
639                    int numberToDispatch = countBeforeFull();
640                    if (numberToDispatch > 0) {
641                        setSlowConsumer(false);
642                        setPendingBatchSize(pending, numberToDispatch);
643                        int count = 0;
644                        pending.reset();
645                        while (pending.hasNext() && !isFull() && count < numberToDispatch) {
646                            MessageReference node = pending.next();
647                            if (node == null) {
648                                break;
649                            }
650    
651                            // Synchronize between dispatched list and remove of message from pending list
652                            // related to remove subscription action
653                            synchronized(dispatchLock) {
654                                pending.remove();
655                                node.decrementReferenceCount();
656                                if( !isDropped(node) && canDispatch(node)) {
657    
658                                    // Message may have been sitting in the pending
659                                    // list a while waiting for the consumer to ak the message.
660                                    if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
661                                        //increment number to dispatch
662                                        numberToDispatch++;
663                                        if (broker.isExpired(node)) {
664                                            ((Destination)node.getRegionDestination()).messageExpired(context, this, node);
665                                        }
666                                        continue;
667                                    }
668                                    dispatch(node);
669                                    count++;
670                                }
671                            }
672                        }
673                    } else if (!isSlowConsumer()) {
674                        setSlowConsumer(true);
675                        for (Destination dest :destinations) {
676                            dest.slowConsumer(context, this);
677                        }
678                    }
679                } finally {
680                    pending.release();
681                }
682            }
683        }
684    
685        protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
686            pending.setMaxBatchSize(numberToDispatch);
687        }
688    
689        // called with dispatchLock held
690        protected boolean dispatch(final MessageReference node) throws IOException {
691            final Message message = node.getMessage();
692            if (message == null) {
693                return false;
694            }
695    
696            okForAckAsDispatchDone.countDown();
697    
698            // No reentrant lock - Patch needed to IndirectMessageReference on method lock
699            MessageDispatch md = createMessageDispatch(node, message);
700            // NULL messages don't count... they don't get Acked.
701            if (node != QueueMessageReference.NULL_MESSAGE) {
702                dispatchCounter++;
703                dispatched.add(node);
704            } else {
705                while (true) {
706                    int currentExtension = prefetchExtension.get();
707                    int newExtension = Math.max(0, currentExtension - 1);
708                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
709                        break;
710                    }
711                }
712            }
713            if (info.isDispatchAsync()) {
714                md.setTransmitCallback(new TransmitCallback() {
715    
716                    @Override
717                    public void onSuccess() {
718                        // Since the message gets queued up in async dispatch, we don't want to
719                        // decrease the reference count until it gets put on the wire.
720                        onDispatch(node, message);
721                    }
722    
723                    @Override
724                    public void onFailure() {
725                        Destination nodeDest = (Destination) node.getRegionDestination();
726                        if (nodeDest != null) {
727                            if (node != QueueMessageReference.NULL_MESSAGE) {
728                                nodeDest.getDestinationStatistics().getDispatched().increment();
729                                nodeDest.getDestinationStatistics().getInflight().increment();
730                                LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() });
731                            }
732                        }
733                    }
734                });
735                context.getConnection().dispatchAsync(md);
736            } else {
737                context.getConnection().dispatchSync(md);
738                onDispatch(node, message);
739            }
740            return true;
741        }
742    
743        protected void onDispatch(final MessageReference node, final Message message) {
744            Destination nodeDest = (Destination) node.getRegionDestination();
745            if (nodeDest != null) {
746                if (node != QueueMessageReference.NULL_MESSAGE) {
747                    nodeDest.getDestinationStatistics().getDispatched().increment();
748                    nodeDest.getDestinationStatistics().getInflight().increment();
749                    LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() });
750                }
751            }
752    
753            if (info.isDispatchAsync()) {
754                try {
755                    dispatchPending();
756                } catch (IOException e) {
757                    context.getConnection().serviceExceptionAsync(e);
758                }
759            }
760        }
761    
762        /**
763         * inform the MessageConsumer on the client to change it's prefetch
764         *
765         * @param newPrefetch
766         */
767        @Override
768        public void updateConsumerPrefetch(int newPrefetch) {
769            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
770                ConsumerControl cc = new ConsumerControl();
771                cc.setConsumerId(info.getConsumerId());
772                cc.setPrefetch(newPrefetch);
773                context.getConnection().dispatchAsync(cc);
774            }
775        }
776    
777        /**
778         * @param node
779         * @param message
780         * @return MessageDispatch
781         */
782        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
783            MessageDispatch md = new MessageDispatch();
784            md.setConsumerId(info.getConsumerId());
785    
786            if (node == QueueMessageReference.NULL_MESSAGE) {
787                md.setMessage(null);
788                md.setDestination(null);
789            } else {
790                Destination regionDestination = (Destination) node.getRegionDestination();
791                md.setDestination(regionDestination.getActiveMQDestination());
792                md.setMessage(message);
793                md.setRedeliveryCounter(node.getRedeliveryCounter());
794            }
795    
796            return md;
797        }
798    
799        /**
800         * Use when a matched message is about to be dispatched to the client.
801         *
802         * @param node
803         * @return false if the message should not be dispatched to the client
804         *         (another sub may have already dispatched it for example).
805         * @throws IOException
806         */
807        protected abstract boolean canDispatch(MessageReference node) throws IOException;
808    
809        protected abstract boolean isDropped(MessageReference node);
810    
811        /**
812         * Used during acknowledgment to remove the message.
813         *
814         * @throws IOException
815         */
816        protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
817    
818    
819        public int getMaxProducersToAudit() {
820            return maxProducersToAudit;
821        }
822    
823        public void setMaxProducersToAudit(int maxProducersToAudit) {
824            this.maxProducersToAudit = maxProducersToAudit;
825            if (this.pending != null) {
826                this.pending.setMaxProducersToAudit(maxProducersToAudit);
827            }
828        }
829    
830        public int getMaxAuditDepth() {
831            return maxAuditDepth;
832        }
833    
834        public void setMaxAuditDepth(int maxAuditDepth) {
835            this.maxAuditDepth = maxAuditDepth;
836            if (this.pending != null) {
837                this.pending.setMaxAuditDepth(maxAuditDepth);
838            }
839        }
840    
841        public boolean isUsePrefetchExtension() {
842            return usePrefetchExtension;
843        }
844    
845        public void setUsePrefetchExtension(boolean usePrefetchExtension) {
846            this.usePrefetchExtension = usePrefetchExtension;
847        }
848    
849        protected int getPrefetchExtension() {
850            return this.prefetchExtension.get();
851        }
852    
853        @Override
854        public void setPrefetchSize(int prefetchSize) {
855            this.info.setPrefetchSize(prefetchSize);
856            try {
857                this.dispatchPending();
858            } catch (Exception e) {
859                LOG.trace("Caught exception during dispatch after prefetch change.", e);
860            }
861        }
862    }