001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.concurrent.CountDownLatch;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.atomic.AtomicInteger;
026
027 import javax.jms.InvalidSelectorException;
028 import javax.jms.JMSException;
029
030 import org.apache.activemq.broker.Broker;
031 import org.apache.activemq.broker.ConnectionContext;
032 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
034 import org.apache.activemq.command.ActiveMQMessage;
035 import org.apache.activemq.command.ConsumerControl;
036 import org.apache.activemq.command.ConsumerInfo;
037 import org.apache.activemq.command.Message;
038 import org.apache.activemq.command.MessageAck;
039 import org.apache.activemq.command.MessageDispatch;
040 import org.apache.activemq.command.MessageDispatchNotification;
041 import org.apache.activemq.command.MessageId;
042 import org.apache.activemq.command.MessagePull;
043 import org.apache.activemq.command.Response;
044 import org.apache.activemq.thread.Scheduler;
045 import org.apache.activemq.transaction.Synchronization;
046 import org.apache.activemq.usage.SystemUsage;
047 import org.slf4j.Logger;
048 import org.slf4j.LoggerFactory;
049
050 /**
051 * A subscription that honors the pre-fetch option of the ConsumerInfo.
052 */
053 public abstract class PrefetchSubscription extends AbstractSubscription {
054
055 private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
056 protected final Scheduler scheduler;
057
058 protected PendingMessageCursor pending;
059 protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
060 protected final AtomicInteger prefetchExtension = new AtomicInteger();
061 protected boolean usePrefetchExtension = true;
062 protected long enqueueCounter;
063 protected long dispatchCounter;
064 protected long dequeueCounter;
065 private int maxProducersToAudit=32;
066 private int maxAuditDepth=2048;
067 protected final SystemUsage usageManager;
068 protected final Object pendingLock = new Object();
069 protected final Object dispatchLock = new Object();
070 private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
071
072 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
073 super(broker,context, info);
074 this.usageManager=usageManager;
075 pending = cursor;
076 this.scheduler = broker.getScheduler();
077 }
078
079 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
080 this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
081 }
082
083 /**
084 * Allows a message to be pulled on demand by a client
085 */
086 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
087 // The slave should not deliver pull messages.
088 // TODO: when the slave becomes a master, He should send a NULL message to all the
089 // consumers to 'wake them up' in case they were waiting for a message.
090 if (getPrefetchSize() == 0 && !isSlave()) {
091
092 prefetchExtension.incrementAndGet();
093 final long dispatchCounterBeforePull = dispatchCounter;
094
095 // Have the destination push us some messages.
096 for (Destination dest : destinations) {
097 dest.iterate();
098 }
099 dispatchPending();
100
101 synchronized(this) {
102 // If there was nothing dispatched.. we may need to setup a timeout.
103 if (dispatchCounterBeforePull == dispatchCounter) {
104 // immediate timeout used by receiveNoWait()
105 if (pull.getTimeout() == -1) {
106 // Send a NULL message.
107 add(QueueMessageReference.NULL_MESSAGE);
108 dispatchPending();
109 }
110 if (pull.getTimeout() > 0) {
111 scheduler.executeAfterDelay(new Runnable() {
112 @Override
113 public void run() {
114 pullTimeout(dispatchCounterBeforePull);
115 }
116 }, pull.getTimeout());
117 }
118 }
119 }
120 }
121 return null;
122 }
123
124 /**
125 * Occurs when a pull times out. If nothing has been dispatched since the
126 * timeout was setup, then send the NULL message.
127 */
128 final void pullTimeout(long dispatchCounterBeforePull) {
129 synchronized (pendingLock) {
130 if (dispatchCounterBeforePull == dispatchCounter) {
131 try {
132 add(QueueMessageReference.NULL_MESSAGE);
133 dispatchPending();
134 } catch (Exception e) {
135 context.getConnection().serviceException(e);
136 }
137 }
138 }
139 }
140
141 public void add(MessageReference node) throws Exception {
142 synchronized (pendingLock) {
143 // The destination may have just been removed...
144 if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
145 // perhaps we should inform the caller that we are no longer valid to dispatch to?
146 return;
147 }
148
149 // Don't increment for the pullTimeout control message.
150 if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
151 enqueueCounter++;
152 }
153 pending.addMessageLast(node);
154 }
155 dispatchPending();
156 }
157
158 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
159 synchronized(pendingLock) {
160 try {
161 pending.reset();
162 while (pending.hasNext()) {
163 MessageReference node = pending.next();
164 node.decrementReferenceCount();
165 if (node.getMessageId().equals(mdn.getMessageId())) {
166 // Synchronize between dispatched list and removal of messages from pending list
167 // related to remove subscription action
168 synchronized(dispatchLock) {
169 pending.remove();
170 createMessageDispatch(node, node.getMessage());
171 dispatched.add(node);
172 onDispatch(node, node.getMessage());
173 }
174 return;
175 }
176 }
177 } finally {
178 pending.release();
179 }
180 }
181 throw new JMSException(
182 "Slave broker out of sync with master: Dispatched message ("
183 + mdn.getMessageId() + ") was not in the pending list for "
184 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
185 }
186
187 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
188 // Handle the standard acknowledgment case.
189 boolean callDispatchMatched = false;
190 Destination destination = null;
191
192 if (!isSlave()) {
193 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
194 // suppress unexpected ack exception in this expected case
195 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
196 return;
197 }
198 }
199 if (LOG.isTraceEnabled()) {
200 LOG.trace("ack:" + ack);
201 }
202 synchronized(dispatchLock) {
203 if (ack.isStandardAck()) {
204 // First check if the ack matches the dispatched. When using failover this might
205 // not be the case. We don't ever want to ack the wrong messages.
206 assertAckMatchesDispatched(ack);
207
208 // Acknowledge all dispatched messages up till the message id of
209 // the acknowledgment.
210 int index = 0;
211 boolean inAckRange = false;
212 List<MessageReference> removeList = new ArrayList<MessageReference>();
213 for (final MessageReference node : dispatched) {
214 MessageId messageId = node.getMessageId();
215 if (ack.getFirstMessageId() == null
216 || ack.getFirstMessageId().equals(messageId)) {
217 inAckRange = true;
218 }
219 if (inAckRange) {
220 // Don't remove the nodes until we are committed.
221 if (!context.isInTransaction()) {
222 dequeueCounter++;
223 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
224 removeList.add(node);
225 } else {
226 registerRemoveSync(context, node);
227 }
228 index++;
229 acknowledge(context, ack, node);
230 if (ack.getLastMessageId().equals(messageId)) {
231 // contract prefetch if dispatch required a pull
232 if (getPrefetchSize() == 0) {
233 // Protect extension update against parallel updates.
234 while (true) {
235 int currentExtension = prefetchExtension.get();
236 int newExtension = Math.max(0, currentExtension - index);
237 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
238 break;
239 }
240 }
241 } else if (usePrefetchExtension && context.isInTransaction()) {
242 // extend prefetch window only if not a pulling consumer
243 while (true) {
244 int currentExtension = prefetchExtension.get();
245 int newExtension = Math.max(currentExtension, index);
246 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
247 break;
248 }
249 }
250 }
251 destination = node.getRegionDestination();
252 callDispatchMatched = true;
253 break;
254 }
255 }
256 }
257 for (final MessageReference node : removeList) {
258 dispatched.remove(node);
259 }
260 // this only happens after a reconnect - get an ack which is not
261 // valid
262 if (!callDispatchMatched) {
263 LOG.warn("Could not correlate acknowledgment with dispatched message: "
264 + ack);
265 }
266 } else if (ack.isIndividualAck()) {
267 // Message was delivered and acknowledge - but only delete the
268 // individual message
269 for (final MessageReference node : dispatched) {
270 MessageId messageId = node.getMessageId();
271 if (ack.getLastMessageId().equals(messageId)) {
272 // Don't remove the nodes until we are committed - immediateAck option
273 if (!context.isInTransaction()) {
274 dequeueCounter++;
275 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
276 dispatched.remove(node);
277 } else {
278 registerRemoveSync(context, node);
279 }
280
281 // Protect extension update against parallel updates.
282 while (true) {
283 int currentExtension = prefetchExtension.get();
284 int newExtension = Math.max(0, currentExtension - 1);
285 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
286 break;
287 }
288 }
289 acknowledge(context, ack, node);
290 destination = node.getRegionDestination();
291 callDispatchMatched = true;
292 break;
293 }
294 }
295 }else if (ack.isDeliveredAck()) {
296 // Message was delivered but not acknowledged: update pre-fetch
297 // counters.
298 int index = 0;
299 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
300 final MessageReference node = iter.next();
301 if (node.isExpired()) {
302 if (broker.isExpired(node)) {
303 node.getRegionDestination().messageExpired(context, this, node);
304 }
305 iter.remove();
306 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
307 }
308 if (ack.getLastMessageId().equals(node.getMessageId())) {
309 if (usePrefetchExtension) {
310 while (true) {
311 int currentExtension = prefetchExtension.get();
312 int newExtension = Math.max(currentExtension, index + 1);
313 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
314 break;
315 }
316 }
317 }
318 destination = node.getRegionDestination();
319 callDispatchMatched = true;
320 break;
321 }
322 }
323 if (!callDispatchMatched) {
324 throw new JMSException(
325 "Could not correlate acknowledgment with dispatched message: "
326 + ack);
327 }
328 } else if (ack.isRedeliveredAck()) {
329 // Message was re-delivered but it was not yet considered to be
330 // a DLQ message.
331 boolean inAckRange = false;
332 for (final MessageReference node : dispatched) {
333 MessageId messageId = node.getMessageId();
334 if (ack.getFirstMessageId() == null
335 || ack.getFirstMessageId().equals(messageId)) {
336 inAckRange = true;
337 }
338 if (inAckRange) {
339 if (ack.getLastMessageId().equals(messageId)) {
340 destination = node.getRegionDestination();
341 callDispatchMatched = true;
342 break;
343 }
344 }
345 }
346 if (!callDispatchMatched) {
347 throw new JMSException(
348 "Could not correlate acknowledgment with dispatched message: "
349 + ack);
350 }
351 } else if (ack.isPoisonAck()) {
352 // TODO: what if the message is already in a DLQ???
353 // Handle the poison ACK case: we need to send the message to a
354 // DLQ
355 if (ack.isInTransaction()) {
356 throw new JMSException("Poison ack cannot be transacted: "
357 + ack);
358 }
359 int index = 0;
360 boolean inAckRange = false;
361 List<MessageReference> removeList = new ArrayList<MessageReference>();
362 for (final MessageReference node : dispatched) {
363 MessageId messageId = node.getMessageId();
364 if (ack.getFirstMessageId() == null
365 || ack.getFirstMessageId().equals(messageId)) {
366 inAckRange = true;
367 }
368 if (inAckRange) {
369 if (ack.getPoisonCause() != null) {
370 node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
371 ack.getPoisonCause().toString());
372 }
373 sendToDLQ(context, node);
374 node.getRegionDestination().getDestinationStatistics()
375 .getInflight().decrement();
376 removeList.add(node);
377 dequeueCounter++;
378 index++;
379 acknowledge(context, ack, node);
380 if (ack.getLastMessageId().equals(messageId)) {
381 while (true) {
382 int currentExtension = prefetchExtension.get();
383 int newExtension = Math.max(0, currentExtension - (index + 1));
384 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
385 break;
386 }
387 }
388 destination = node.getRegionDestination();
389 callDispatchMatched = true;
390 break;
391 }
392 }
393 }
394 for (final MessageReference node : removeList) {
395 dispatched.remove(node);
396 }
397 if (!callDispatchMatched) {
398 throw new JMSException(
399 "Could not correlate acknowledgment with dispatched message: "
400 + ack);
401 }
402 }
403 }
404 if (callDispatchMatched && destination != null) {
405 destination.wakeup();
406 dispatchPending();
407 } else {
408 if (isSlave()) {
409 throw new JMSException(
410 "Slave broker out of sync with master: Acknowledgment ("
411 + ack + ") was not in the dispatch list: "
412 + dispatched);
413 } else {
414 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
415 + ack);
416 }
417 }
418 }
419
420 private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
421 // setup a Synchronization to remove nodes from the
422 // dispatched list.
423 context.getTransaction().addSynchronization(
424 new Synchronization() {
425
426 @Override
427 public void afterCommit()
428 throws Exception {
429 synchronized(dispatchLock) {
430 dequeueCounter++;
431 dispatched.remove(node);
432 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
433 }
434 node.getRegionDestination().wakeup();
435 dispatchPending();
436 }
437
438 @Override
439 public void afterRollback() throws Exception {
440 synchronized(dispatchLock) {
441 if (isSlave()) {
442 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
443 } else {
444 // poisionAck will decrement - otherwise still inflight on client
445 }
446 }
447 }
448 });
449 }
450
451 /**
452 * Checks an ack versus the contents of the dispatched list.
453 * called with dispatchLock held
454 * @param ack
455 * @throws JMSException if it does not match
456 */
457 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
458 MessageId firstAckedMsg = ack.getFirstMessageId();
459 MessageId lastAckedMsg = ack.getLastMessageId();
460 int checkCount = 0;
461 boolean checkFoundStart = false;
462 boolean checkFoundEnd = false;
463 for (MessageReference node : dispatched) {
464
465 if (firstAckedMsg == null) {
466 checkFoundStart = true;
467 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
468 checkFoundStart = true;
469 }
470
471 if (checkFoundStart) {
472 checkCount++;
473 }
474
475 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
476 checkFoundEnd = true;
477 break;
478 }
479 }
480 if (!checkFoundStart && firstAckedMsg != null)
481 throw new JMSException("Unmatched acknowledge: " + ack
482 + "; Could not find Message-ID " + firstAckedMsg
483 + " in dispatched-list (start of ack)");
484 if (!checkFoundEnd && lastAckedMsg != null)
485 throw new JMSException("Unmatched acknowledge: " + ack
486 + "; Could not find Message-ID " + lastAckedMsg
487 + " in dispatched-list (end of ack)");
488 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
489 throw new JMSException("Unmatched acknowledge: " + ack
490 + "; Expected message count (" + ack.getMessageCount()
491 + ") differs from count in dispatched-list (" + checkCount
492 + ")");
493 }
494 }
495
496 /**
497 * @param context
498 * @param node
499 * @throws IOException
500 * @throws Exception
501 */
502 protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
503 broker.getRoot().sendToDeadLetterQueue(context, node, this);
504 }
505
506 public int getInFlightSize() {
507 return dispatched.size();
508 }
509
510 /**
511 * Used to determine if the broker can dispatch to the consumer.
512 *
513 * @return
514 */
515 public boolean isFull() {
516 return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
517 }
518
519 /**
520 * @return true when 60% or more room is left for dispatching messages
521 */
522 public boolean isLowWaterMark() {
523 return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
524 }
525
526 /**
527 * @return true when 10% or less room is left for dispatching messages
528 */
529 public boolean isHighWaterMark() {
530 return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
531 }
532
533 @Override
534 public int countBeforeFull() {
535 return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
536 }
537
538 public int getPendingQueueSize() {
539 return pending.size();
540 }
541
542 public int getDispatchedQueueSize() {
543 return dispatched.size();
544 }
545
546 public long getDequeueCounter() {
547 return dequeueCounter;
548 }
549
550 public long getDispatchedCounter() {
551 return dispatchCounter;
552 }
553
554 public long getEnqueueCounter() {
555 return enqueueCounter;
556 }
557
558 @Override
559 public boolean isRecoveryRequired() {
560 return pending.isRecoveryRequired();
561 }
562
563 public PendingMessageCursor getPending() {
564 return this.pending;
565 }
566
567 public void setPending(PendingMessageCursor pending) {
568 this.pending = pending;
569 if (this.pending!=null) {
570 this.pending.setSystemUsage(usageManager);
571 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
572 }
573 }
574
575 @Override
576 public void add(ConnectionContext context, Destination destination) throws Exception {
577 synchronized(pendingLock) {
578 super.add(context, destination);
579 pending.add(context, destination);
580 }
581 }
582
583 @Override
584 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
585 List<MessageReference> rc = new ArrayList<MessageReference>();
586 synchronized(pendingLock) {
587 super.remove(context, destination);
588 // Here is a potential problem concerning Inflight stat:
589 // Messages not already committed or rolled back may not be removed from dispatched list at the moment
590 // Except if each commit or rollback callback action comes before remove of subscriber.
591 rc.addAll(pending.remove(context, destination));
592
593 // Synchronized to DispatchLock
594 synchronized(dispatchLock) {
595 ArrayList<MessageReference> references = new ArrayList<MessageReference>();
596 for (MessageReference r : dispatched) {
597 if( r.getRegionDestination() == destination) {
598 references.add(r);
599 }
600 }
601 rc.addAll(references);
602 destination.getDestinationStatistics().getDispatched().subtract(references.size());
603 destination.getDestinationStatistics().getInflight().subtract(references.size());
604 dispatched.removeAll(references);
605 }
606 }
607 return rc;
608 }
609
610 protected void dispatchPending() throws IOException {
611 if (!isSlave()) {
612 synchronized(pendingLock) {
613 try {
614 int numberToDispatch = countBeforeFull();
615 if (numberToDispatch > 0) {
616 setSlowConsumer(false);
617 setPendingBatchSize(pending, numberToDispatch);
618 int count = 0;
619 pending.reset();
620 while (pending.hasNext() && !isFull()
621 && count < numberToDispatch) {
622 MessageReference node = pending.next();
623 if (node == null) {
624 break;
625 }
626
627 // Synchronize between dispatched list and remove of message from pending list
628 // related to remove subscription action
629 synchronized(dispatchLock) {
630 pending.remove();
631 node.decrementReferenceCount();
632 if( !isDropped(node) && canDispatch(node)) {
633
634 // Message may have been sitting in the pending
635 // list a while waiting for the consumer to ak the message.
636 if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
637 //increment number to dispatch
638 numberToDispatch++;
639 if (broker.isExpired(node)) {
640 node.getRegionDestination().messageExpired(context, this, node);
641 }
642 continue;
643 }
644 dispatch(node);
645 count++;
646 }
647 }
648 }
649 } else if (!isSlowConsumer()) {
650 setSlowConsumer(true);
651 for (Destination dest :destinations) {
652 dest.slowConsumer(context, this);
653 }
654 }
655 } finally {
656 pending.release();
657 }
658 }
659 }
660 }
661
662 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
663 pending.setMaxBatchSize(numberToDispatch);
664 }
665
666 // called with dispatchLock held
667 protected boolean dispatch(final MessageReference node) throws IOException {
668 final Message message = node.getMessage();
669 if (message == null) {
670 return false;
671 }
672
673 okForAckAsDispatchDone.countDown();
674
675 // No reentrant lock - Patch needed to IndirectMessageReference on method lock
676 if (!isSlave()) {
677
678 MessageDispatch md = createMessageDispatch(node, message);
679 // NULL messages don't count... they don't get Acked.
680 if (node != QueueMessageReference.NULL_MESSAGE) {
681 dispatchCounter++;
682 dispatched.add(node);
683 } else {
684 while (true) {
685 int currentExtension = prefetchExtension.get();
686 int newExtension = Math.max(0, currentExtension - 1);
687 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
688 break;
689 }
690 }
691 }
692 if (info.isDispatchAsync()) {
693 md.setTransmitCallback(new Runnable() {
694
695 public void run() {
696 // Since the message gets queued up in async dispatch,
697 // we don't want to
698 // decrease the reference count until it gets put on the
699 // wire.
700 onDispatch(node, message);
701 }
702 });
703 context.getConnection().dispatchAsync(md);
704 } else {
705 context.getConnection().dispatchSync(md);
706 onDispatch(node, message);
707 }
708 return true;
709 } else {
710 return false;
711 }
712 }
713
714 protected void onDispatch(final MessageReference node, final Message message) {
715 if (node.getRegionDestination() != null) {
716 if (node != QueueMessageReference.NULL_MESSAGE) {
717 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
718 node.getRegionDestination().getDestinationStatistics().getInflight().increment();
719 if (LOG.isTraceEnabled()) {
720 LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - "
721 + message.getDestination() + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
722 }
723 }
724 }
725
726 if (info.isDispatchAsync()) {
727 try {
728 dispatchPending();
729 } catch (IOException e) {
730 context.getConnection().serviceExceptionAsync(e);
731 }
732 }
733 }
734
735 /**
736 * inform the MessageConsumer on the client to change it's prefetch
737 *
738 * @param newPrefetch
739 */
740 public void updateConsumerPrefetch(int newPrefetch) {
741 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
742 ConsumerControl cc = new ConsumerControl();
743 cc.setConsumerId(info.getConsumerId());
744 cc.setPrefetch(newPrefetch);
745 context.getConnection().dispatchAsync(cc);
746 }
747 }
748
749 /**
750 * @param node
751 * @param message
752 * @return MessageDispatch
753 */
754 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
755 MessageDispatch md = new MessageDispatch();
756 md.setConsumerId(info.getConsumerId());
757
758 if (node == QueueMessageReference.NULL_MESSAGE) {
759 md.setMessage(null);
760 md.setDestination(null);
761 } else {
762 md.setDestination(node.getRegionDestination().getActiveMQDestination());
763 md.setMessage(message);
764 md.setRedeliveryCounter(node.getRedeliveryCounter());
765 }
766
767 return md;
768 }
769
770 /**
771 * Use when a matched message is about to be dispatched to the client.
772 *
773 * @param node
774 * @return false if the message should not be dispatched to the client
775 * (another sub may have already dispatched it for example).
776 * @throws IOException
777 */
778 protected abstract boolean canDispatch(MessageReference node) throws IOException;
779
780 protected abstract boolean isDropped(MessageReference node);
781
782 /**
783 * Used during acknowledgment to remove the message.
784 *
785 * @throws IOException
786 */
787 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
788
789
790 public int getMaxProducersToAudit() {
791 return maxProducersToAudit;
792 }
793
794 public void setMaxProducersToAudit(int maxProducersToAudit) {
795 this.maxProducersToAudit = maxProducersToAudit;
796 }
797
798 public int getMaxAuditDepth() {
799 return maxAuditDepth;
800 }
801
802 public void setMaxAuditDepth(int maxAuditDepth) {
803 this.maxAuditDepth = maxAuditDepth;
804 }
805
806 public boolean isUsePrefetchExtension() {
807 return usePrefetchExtension;
808 }
809
810 public void setUsePrefetchExtension(boolean usePrefetchExtension) {
811 this.usePrefetchExtension = usePrefetchExtension;
812 }
813
814 protected int getPrefetchExtension() {
815 return this.prefetchExtension.get();
816 }
817
818 @Override
819 public void setPrefetchSize(int prefetchSize) {
820 this.info.setPrefetchSize(prefetchSize);
821 try {
822 this.dispatchPending();
823 } catch (Exception e) {
824 LOG.trace("Caught exception during dispatch after prefetch change.", e);
825 }
826 }
827 }