001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.LinkedList;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Map.Entry;
027 import java.util.concurrent.ExecutorService;
028 import java.util.concurrent.Executors;
029 import java.util.concurrent.atomic.AtomicBoolean;
030 import java.util.concurrent.atomic.AtomicInteger;
031 import java.util.concurrent.atomic.AtomicReference;
032
033 import javax.jms.IllegalStateException;
034 import javax.jms.InvalidDestinationException;
035 import javax.jms.JMSException;
036 import javax.jms.Message;
037 import javax.jms.MessageConsumer;
038 import javax.jms.MessageListener;
039 import javax.jms.TransactionRolledBackException;
040
041 import org.apache.activemq.blob.BlobDownloader;
042 import org.apache.activemq.command.ActiveMQBlobMessage;
043 import org.apache.activemq.command.ActiveMQDestination;
044 import org.apache.activemq.command.ActiveMQMessage;
045 import org.apache.activemq.command.ActiveMQTempDestination;
046 import org.apache.activemq.command.CommandTypes;
047 import org.apache.activemq.command.ConsumerId;
048 import org.apache.activemq.command.ConsumerInfo;
049 import org.apache.activemq.command.MessageAck;
050 import org.apache.activemq.command.MessageDispatch;
051 import org.apache.activemq.command.MessageId;
052 import org.apache.activemq.command.MessagePull;
053 import org.apache.activemq.command.RemoveInfo;
054 import org.apache.activemq.command.TransactionId;
055 import org.apache.activemq.management.JMSConsumerStatsImpl;
056 import org.apache.activemq.management.StatsCapable;
057 import org.apache.activemq.management.StatsImpl;
058 import org.apache.activemq.selector.SelectorParser;
059 import org.apache.activemq.transaction.Synchronization;
060 import org.apache.activemq.util.Callback;
061 import org.apache.activemq.util.IntrospectionSupport;
062 import org.apache.activemq.util.JMSExceptionSupport;
063 import org.apache.activemq.util.ThreadPoolUtils;
064 import org.slf4j.Logger;
065 import org.slf4j.LoggerFactory;
066
067 /**
068 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
069 * from a destination. A <CODE> MessageConsumer</CODE> object is created by
070 * passing a <CODE>Destination</CODE> object to a message-consumer creation
071 * method supplied by a session.
072 * <P>
073 * <CODE>MessageConsumer</CODE> is the parent interface for all message
074 * consumers.
075 * <P>
076 * A message consumer can be created with a message selector. A message selector
077 * allows the client to restrict the messages delivered to the message consumer
078 * to those that match the selector.
079 * <P>
080 * A client may either synchronously receive a message consumer's messages or
081 * have the consumer asynchronously deliver them as they arrive.
082 * <P>
083 * For synchronous receipt, a client can request the next message from a message
084 * consumer using one of its <CODE> receive</CODE> methods. There are several
085 * variations of <CODE>receive</CODE> that allow a client to poll or wait for
086 * the next message.
087 * <P>
088 * For asynchronous delivery, a client can register a
089 * <CODE>MessageListener</CODE> object with a message consumer. As messages
090 * arrive at the message consumer, it delivers them by calling the
091 * <CODE>MessageListener</CODE>'s<CODE>
092 * onMessage</CODE> method.
093 * <P>
094 * It is a client programming error for a <CODE>MessageListener</CODE> to
095 * throw an exception.
096 *
097 *
098 * @see javax.jms.MessageConsumer
099 * @see javax.jms.QueueReceiver
100 * @see javax.jms.TopicSubscriber
101 * @see javax.jms.Session
102 */
103 public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
104
105 @SuppressWarnings("serial")
106 class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
107 final TransactionId transactionId;
108 public PreviouslyDeliveredMap(TransactionId transactionId) {
109 this.transactionId = transactionId;
110 }
111 }
112
113 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
114 protected final ActiveMQSession session;
115 protected final ConsumerInfo info;
116
117 // These are the messages waiting to be delivered to the client
118 protected final MessageDispatchChannel unconsumedMessages;
119
120 // The are the messages that were delivered to the consumer but that have
121 // not been acknowledged. It's kept in reverse order since we
122 // Always walk list in reverse order.
123 private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
124 // track duplicate deliveries in a transaction such that the tx integrity can be validated
125 private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
126 private int deliveredCounter;
127 private int additionalWindowSize;
128 private long redeliveryDelay;
129 private int ackCounter;
130 private int dispatchedCount;
131 private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
132 private final JMSConsumerStatsImpl stats;
133
134 private final String selector;
135 private boolean synchronizationRegistered;
136 private final AtomicBoolean started = new AtomicBoolean(false);
137
138 private MessageAvailableListener availableListener;
139
140 private RedeliveryPolicy redeliveryPolicy;
141 private boolean optimizeAcknowledge;
142 private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
143 private ExecutorService executorService;
144 private MessageTransformer transformer;
145 private boolean clearDispatchList;
146 AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
147
148 private MessageAck pendingAck;
149 private long lastDeliveredSequenceId;
150
151 private IOException failureError;
152
153 private long optimizeAckTimestamp = System.currentTimeMillis();
154 private long optimizeAcknowledgeTimeOut = 0;
155 private long optimizedAckScheduledAckInterval = 0;
156 private Runnable optimizedAckTask;
157 private long failoverRedeliveryWaitPeriod = 0;
158 private boolean transactedIndividualAck = false;
159 private boolean nonBlockingRedelivery = false;
160
161 /**
162 * Create a MessageConsumer
163 *
164 * @param session
165 * @param dest
166 * @param name
167 * @param selector
168 * @param prefetch
169 * @param maximumPendingMessageCount
170 * @param noLocal
171 * @param browser
172 * @param dispatchAsync
173 * @param messageListener
174 * @throws JMSException
175 */
176 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
177 String name, String selector, int prefetch,
178 int maximumPendingMessageCount, boolean noLocal, boolean browser,
179 boolean dispatchAsync, MessageListener messageListener) throws JMSException {
180 if (dest == null) {
181 throw new InvalidDestinationException("Don't understand null destinations");
182 } else if (dest.getPhysicalName() == null) {
183 throw new InvalidDestinationException("The destination object was not given a physical name.");
184 } else if (dest.isTemporary()) {
185 String physicalName = dest.getPhysicalName();
186
187 if (physicalName == null) {
188 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
189 }
190
191 String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
192
193 if (physicalName.indexOf(connectionID) < 0) {
194 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
195 }
196
197 if (session.connection.isDeleted(dest)) {
198 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
199 }
200 if (prefetch < 0) {
201 throw new JMSException("Cannot have a prefetch size less than zero");
202 }
203 }
204 if (session.connection.isMessagePrioritySupported()) {
205 this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
206 }else {
207 this.unconsumedMessages = new FifoMessageDispatchChannel();
208 }
209
210 this.session = session;
211 this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
212 setTransformer(session.getTransformer());
213
214 this.info = new ConsumerInfo(consumerId);
215 this.info.setExclusive(this.session.connection.isExclusiveConsumer());
216 this.info.setSubscriptionName(name);
217 this.info.setPrefetchSize(prefetch);
218 this.info.setCurrentPrefetchSize(prefetch);
219 this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
220 this.info.setNoLocal(noLocal);
221 this.info.setDispatchAsync(dispatchAsync);
222 this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
223 this.info.setSelector(null);
224
225 // Allows the options on the destination to configure the consumerInfo
226 if (dest.getOptions() != null) {
227 Map<String, Object> options = IntrospectionSupport.extractProperties(
228 new HashMap<String, Object>(dest.getOptions()), "consumer.");
229 IntrospectionSupport.setProperties(this.info, options);
230 if (options.size() > 0) {
231 String msg = "There are " + options.size()
232 + " consumer options that couldn't be set on the consumer."
233 + " Check the options are spelled correctly."
234 + " Unknown parameters=[" + options + "]."
235 + " This consumer cannot be started.";
236 LOG.warn(msg);
237 throw new ConfigurationException(msg);
238 }
239 }
240
241 this.info.setDestination(dest);
242 this.info.setBrowser(browser);
243 if (selector != null && selector.trim().length() != 0) {
244 // Validate the selector
245 SelectorParser.parse(selector);
246 this.info.setSelector(selector);
247 this.selector = selector;
248 } else if (info.getSelector() != null) {
249 // Validate the selector
250 SelectorParser.parse(this.info.getSelector());
251 this.selector = this.info.getSelector();
252 } else {
253 this.selector = null;
254 }
255
256 this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
257 this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
258 && !info.isBrowser();
259 if (this.optimizeAcknowledge) {
260 this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
261 setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
262 }
263
264 this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
265 this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
266 this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
267 this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
268 if (messageListener != null) {
269 setMessageListener(messageListener);
270 }
271 try {
272 this.session.addConsumer(this);
273 this.session.syncSendPacket(info);
274 } catch (JMSException e) {
275 this.session.removeConsumer(this);
276 throw e;
277 }
278
279 if (session.connection.isStarted()) {
280 start();
281 }
282 }
283
284 private boolean isAutoAcknowledgeEach() {
285 return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
286 }
287
288 private boolean isAutoAcknowledgeBatch() {
289 return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
290 }
291
292 public StatsImpl getStats() {
293 return stats;
294 }
295
296 public JMSConsumerStatsImpl getConsumerStats() {
297 return stats;
298 }
299
300 public RedeliveryPolicy getRedeliveryPolicy() {
301 return redeliveryPolicy;
302 }
303
304 /**
305 * Sets the redelivery policy used when messages are redelivered
306 */
307 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
308 this.redeliveryPolicy = redeliveryPolicy;
309 }
310
311 public MessageTransformer getTransformer() {
312 return transformer;
313 }
314
315 /**
316 * Sets the transformer used to transform messages before they are sent on
317 * to the JMS bus
318 */
319 public void setTransformer(MessageTransformer transformer) {
320 this.transformer = transformer;
321 }
322
323 /**
324 * @return Returns the value.
325 */
326 public ConsumerId getConsumerId() {
327 return info.getConsumerId();
328 }
329
330 /**
331 * @return the consumer name - used for durable consumers
332 */
333 public String getConsumerName() {
334 return this.info.getSubscriptionName();
335 }
336
337 /**
338 * @return true if this consumer does not accept locally produced messages
339 */
340 protected boolean isNoLocal() {
341 return info.isNoLocal();
342 }
343
344 /**
345 * Retrieve is a browser
346 *
347 * @return true if a browser
348 */
349 protected boolean isBrowser() {
350 return info.isBrowser();
351 }
352
353 /**
354 * @return ActiveMQDestination
355 */
356 protected ActiveMQDestination getDestination() {
357 return info.getDestination();
358 }
359
360 /**
361 * @return Returns the prefetchNumber.
362 */
363 public int getPrefetchNumber() {
364 return info.getPrefetchSize();
365 }
366
367 /**
368 * @return true if this is a durable topic subscriber
369 */
370 public boolean isDurableSubscriber() {
371 return info.getSubscriptionName() != null && info.getDestination().isTopic();
372 }
373
374 /**
375 * Gets this message consumer's message selector expression.
376 *
377 * @return this message consumer's message selector, or null if no message
378 * selector exists for the message consumer (that is, if the message
379 * selector was not set or was set to null or the empty string)
380 * @throws JMSException if the JMS provider fails to receive the next
381 * message due to some internal error.
382 */
383 public String getMessageSelector() throws JMSException {
384 checkClosed();
385 return selector;
386 }
387
388 /**
389 * Gets the message consumer's <CODE>MessageListener</CODE>.
390 *
391 * @return the listener for the message consumer, or null if no listener is
392 * set
393 * @throws JMSException if the JMS provider fails to get the message
394 * listener due to some internal error.
395 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
396 */
397 public MessageListener getMessageListener() throws JMSException {
398 checkClosed();
399 return this.messageListener.get();
400 }
401
402 /**
403 * Sets the message consumer's <CODE>MessageListener</CODE>.
404 * <P>
405 * Setting the message listener to null is the equivalent of unsetting the
406 * message listener for the message consumer.
407 * <P>
408 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
409 * while messages are being consumed by an existing listener or the consumer
410 * is being used to consume messages synchronously is undefined.
411 *
412 * @param listener the listener to which the messages are to be delivered
413 * @throws JMSException if the JMS provider fails to receive the next
414 * message due to some internal error.
415 * @see javax.jms.MessageConsumer#getMessageListener
416 */
417 public void setMessageListener(MessageListener listener) throws JMSException {
418 checkClosed();
419 if (info.getPrefetchSize() == 0) {
420 throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
421 }
422 if (listener != null) {
423 boolean wasRunning = session.isRunning();
424 if (wasRunning) {
425 session.stop();
426 }
427
428 this.messageListener.set(listener);
429 session.redispatch(this, unconsumedMessages);
430
431 if (wasRunning) {
432 session.start();
433 }
434 } else {
435 this.messageListener.set(null);
436 }
437 }
438
439 public MessageAvailableListener getAvailableListener() {
440 return availableListener;
441 }
442
443 /**
444 * Sets the listener used to notify synchronous consumers that there is a
445 * message available so that the {@link MessageConsumer#receiveNoWait()} can
446 * be called.
447 */
448 public void setAvailableListener(MessageAvailableListener availableListener) {
449 this.availableListener = availableListener;
450 }
451
452 /**
453 * Used to get an enqueued message from the unconsumedMessages list. The
454 * amount of time this method blocks is based on the timeout value. - if
455 * timeout==-1 then it blocks until a message is received. - if timeout==0
456 * then it it tries to not block at all, it returns a message if it is
457 * available - if timeout>0 then it blocks up to timeout amount of time.
458 * Expired messages will consumed by this method.
459 *
460 * @throws JMSException
461 * @return null if we timeout or if the consumer is closed.
462 */
463 private MessageDispatch dequeue(long timeout) throws JMSException {
464 try {
465 long deadline = 0;
466 if (timeout > 0) {
467 deadline = System.currentTimeMillis() + timeout;
468 }
469 while (true) {
470 MessageDispatch md = unconsumedMessages.dequeue(timeout);
471 if (md == null) {
472 if (timeout > 0 && !unconsumedMessages.isClosed()) {
473 timeout = Math.max(deadline - System.currentTimeMillis(), 0);
474 } else {
475 if (failureError != null) {
476 throw JMSExceptionSupport.create(failureError);
477 } else {
478 return null;
479 }
480 }
481 } else if (md.getMessage() == null) {
482 return null;
483 } else if (md.getMessage().isExpired()) {
484 if (LOG.isDebugEnabled()) {
485 LOG.debug(getConsumerId() + " received expired message: " + md);
486 }
487 beforeMessageIsConsumed(md);
488 afterMessageIsConsumed(md, true);
489 if (timeout > 0) {
490 timeout = Math.max(deadline - System.currentTimeMillis(), 0);
491 }
492 } else {
493 if (LOG.isTraceEnabled()) {
494 LOG.trace(getConsumerId() + " received message: " + md);
495 }
496 return md;
497 }
498 }
499 } catch (InterruptedException e) {
500 Thread.currentThread().interrupt();
501 throw JMSExceptionSupport.create(e);
502 }
503 }
504
505 /**
506 * Receives the next message produced for this message consumer.
507 * <P>
508 * This call blocks indefinitely until a message is produced or until this
509 * message consumer is closed.
510 * <P>
511 * If this <CODE>receive</CODE> is done within a transaction, the consumer
512 * retains the message until the transaction commits.
513 *
514 * @return the next message produced for this message consumer, or null if
515 * this message consumer is concurrently closed
516 */
517 public Message receive() throws JMSException {
518 checkClosed();
519 checkMessageListener();
520
521 sendPullCommand(0);
522 MessageDispatch md = dequeue(-1);
523 if (md == null) {
524 return null;
525 }
526
527 beforeMessageIsConsumed(md);
528 afterMessageIsConsumed(md, false);
529
530 return createActiveMQMessage(md);
531 }
532
533 /**
534 * @param md
535 * @return
536 */
537 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
538 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
539 if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
540 ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
541 }
542 if (transformer != null) {
543 Message transformedMessage = transformer.consumerTransform(session, this, m);
544 if (transformedMessage != null) {
545 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
546 }
547 }
548 if (session.isClientAcknowledge()) {
549 m.setAcknowledgeCallback(new Callback() {
550 public void execute() throws Exception {
551 session.checkClosed();
552 session.acknowledge();
553 }
554 });
555 } else if (session.isIndividualAcknowledge()) {
556 m.setAcknowledgeCallback(new Callback() {
557 public void execute() throws Exception {
558 session.checkClosed();
559 acknowledge(md);
560 }
561 });
562 }
563 return m;
564 }
565
566 /**
567 * Receives the next message that arrives within the specified timeout
568 * interval.
569 * <P>
570 * This call blocks until a message arrives, the timeout expires, or this
571 * message consumer is closed. A <CODE>timeout</CODE> of zero never
572 * expires, and the call blocks indefinitely.
573 *
574 * @param timeout the timeout value (in milliseconds), a time out of zero
575 * never expires.
576 * @return the next message produced for this message consumer, or null if
577 * the timeout expires or this message consumer is concurrently
578 * closed
579 */
580 public Message receive(long timeout) throws JMSException {
581 checkClosed();
582 checkMessageListener();
583 if (timeout == 0) {
584 return this.receive();
585 }
586
587 sendPullCommand(timeout);
588 while (timeout > 0) {
589
590 MessageDispatch md;
591 if (info.getPrefetchSize() == 0) {
592 md = dequeue(-1); // We let the broker let us know when we timeout.
593 } else {
594 md = dequeue(timeout);
595 }
596
597 if (md == null) {
598 return null;
599 }
600
601 beforeMessageIsConsumed(md);
602 afterMessageIsConsumed(md, false);
603 return createActiveMQMessage(md);
604 }
605 return null;
606 }
607
608 /**
609 * Receives the next message if one is immediately available.
610 *
611 * @return the next message produced for this message consumer, or null if
612 * one is not available
613 * @throws JMSException if the JMS provider fails to receive the next
614 * message due to some internal error.
615 */
616 public Message receiveNoWait() throws JMSException {
617 checkClosed();
618 checkMessageListener();
619 sendPullCommand(-1);
620
621 MessageDispatch md;
622 if (info.getPrefetchSize() == 0) {
623 md = dequeue(-1); // We let the broker let us know when we
624 // timeout.
625 } else {
626 md = dequeue(0);
627 }
628
629 if (md == null) {
630 return null;
631 }
632
633 beforeMessageIsConsumed(md);
634 afterMessageIsConsumed(md, false);
635 return createActiveMQMessage(md);
636 }
637
638 /**
639 * Closes the message consumer.
640 * <P>
641 * Since a provider may allocate some resources on behalf of a <CODE>
642 * MessageConsumer</CODE>
643 * outside the Java virtual machine, clients should close them when they are
644 * not needed. Relying on garbage collection to eventually reclaim these
645 * resources may not be timely enough.
646 * <P>
647 * This call blocks until a <CODE>receive</CODE> or message listener in
648 * progress has completed. A blocked message consumer <CODE>receive </CODE>
649 * call returns null when this message consumer is closed.
650 *
651 * @throws JMSException if the JMS provider fails to close the consumer due
652 * to some internal error.
653 */
654 public void close() throws JMSException {
655 if (!unconsumedMessages.isClosed()) {
656 if (session.getTransactionContext().isInTransaction()) {
657 session.getTransactionContext().addSynchronization(new Synchronization() {
658 @Override
659 public void afterCommit() throws Exception {
660 doClose();
661 }
662
663 @Override
664 public void afterRollback() throws Exception {
665 doClose();
666 }
667 });
668 } else {
669 doClose();
670 }
671 }
672 }
673
674 void doClose() throws JMSException {
675 // Store interrupted state and clear so that Transport operations don't
676 // throw InterruptedException and we ensure that resources are clened up.
677 boolean interrupted = Thread.interrupted();
678 dispose();
679 RemoveInfo removeCommand = info.createRemoveCommand();
680 if (LOG.isDebugEnabled()) {
681 LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
682 }
683 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
684 this.session.asyncSendPacket(removeCommand);
685 if (interrupted) {
686 Thread.currentThread().interrupt();
687 }
688 }
689
690 void inProgressClearRequired() {
691 inProgressClearRequiredFlag.incrementAndGet();
692 // deal with delivered messages async to avoid lock contention with in progress acks
693 clearDispatchList = true;
694 }
695
696 void clearMessagesInProgress() {
697 if (inProgressClearRequiredFlag.get() > 0) {
698 synchronized (unconsumedMessages.getMutex()) {
699 if (inProgressClearRequiredFlag.get() > 0) {
700 if (LOG.isDebugEnabled()) {
701 LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
702 }
703 // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
704 List<MessageDispatch> list = unconsumedMessages.removeAll();
705 if (!this.info.isBrowser()) {
706 for (MessageDispatch old : list) {
707 session.connection.rollbackDuplicate(this, old.getMessage());
708 }
709 }
710 // allow dispatch on this connection to resume
711 session.connection.transportInterruptionProcessingComplete();
712 inProgressClearRequiredFlag.decrementAndGet();
713
714 // Wake up any blockers and allow them to recheck state.
715 unconsumedMessages.getMutex().notifyAll();
716 }
717 }
718 }
719 }
720
721 void deliverAcks() {
722 MessageAck ack = null;
723 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
724 if (isAutoAcknowledgeEach()) {
725 synchronized(deliveredMessages) {
726 ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
727 if (ack != null) {
728 deliveredMessages.clear();
729 ackCounter = 0;
730 } else {
731 ack = pendingAck;
732 pendingAck = null;
733 }
734 }
735 } else if (pendingAck != null && pendingAck.isStandardAck()) {
736 ack = pendingAck;
737 pendingAck = null;
738 }
739 if (ack != null) {
740 final MessageAck ackToSend = ack;
741
742 if (executorService == null) {
743 executorService = Executors.newSingleThreadExecutor();
744 }
745 executorService.submit(new Runnable() {
746 public void run() {
747 try {
748 session.sendAck(ackToSend,true);
749 } catch (JMSException e) {
750 LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
751 } finally {
752 deliveryingAcknowledgements.set(false);
753 }
754 }
755 });
756 } else {
757 deliveryingAcknowledgements.set(false);
758 }
759 }
760 }
761
762 public void dispose() throws JMSException {
763 if (!unconsumedMessages.isClosed()) {
764
765 // Do we have any acks we need to send out before closing?
766 // Ack any delivered messages now.
767 if (!session.getTransacted()) {
768 deliverAcks();
769 if (isAutoAcknowledgeBatch()) {
770 acknowledge();
771 }
772 }
773 if (executorService != null) {
774 ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
775 executorService = null;
776 }
777 if (optimizedAckTask != null) {
778 this.session.connection.getScheduler().cancel(optimizedAckTask);
779 optimizedAckTask = null;
780 }
781
782 if (session.isClientAcknowledge()) {
783 if (!this.info.isBrowser()) {
784 // rollback duplicates that aren't acknowledged
785 List<MessageDispatch> tmp = null;
786 synchronized (this.deliveredMessages) {
787 tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
788 }
789 for (MessageDispatch old : tmp) {
790 this.session.connection.rollbackDuplicate(this, old.getMessage());
791 }
792 tmp.clear();
793 }
794 }
795 if (!session.isTransacted()) {
796 synchronized(deliveredMessages) {
797 deliveredMessages.clear();
798 }
799 }
800 unconsumedMessages.close();
801 this.session.removeConsumer(this);
802 List<MessageDispatch> list = unconsumedMessages.removeAll();
803 if (!this.info.isBrowser()) {
804 for (MessageDispatch old : list) {
805 // ensure we don't filter this as a duplicate
806 session.connection.rollbackDuplicate(this, old.getMessage());
807 }
808 }
809 }
810 }
811
812 /**
813 * @throws IllegalStateException
814 */
815 protected void checkClosed() throws IllegalStateException {
816 if (unconsumedMessages.isClosed()) {
817 throw new IllegalStateException("The Consumer is closed");
818 }
819 }
820
821 /**
822 * If we have a zero prefetch specified then send a pull command to the
823 * broker to pull a message we are about to receive
824 */
825 protected void sendPullCommand(long timeout) throws JMSException {
826 clearDispatchList();
827 if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
828 MessagePull messagePull = new MessagePull();
829 messagePull.configure(info);
830 messagePull.setTimeout(timeout);
831 session.asyncSendPacket(messagePull);
832 }
833 }
834
835 protected void checkMessageListener() throws JMSException {
836 session.checkMessageListener();
837 }
838
839 protected void setOptimizeAcknowledge(boolean value) {
840 if (optimizeAcknowledge && !value) {
841 deliverAcks();
842 }
843 optimizeAcknowledge = value;
844 }
845
846 protected void setPrefetchSize(int prefetch) {
847 deliverAcks();
848 this.info.setCurrentPrefetchSize(prefetch);
849 }
850
851 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
852 md.setDeliverySequenceId(session.getNextDeliveryId());
853 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
854 if (!isAutoAcknowledgeBatch()) {
855 synchronized(deliveredMessages) {
856 deliveredMessages.addFirst(md);
857 }
858 if (session.getTransacted()) {
859 if (transactedIndividualAck) {
860 immediateIndividualTransactedAck(md);
861 } else {
862 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
863 }
864 }
865 }
866 }
867
868 private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException {
869 // acks accumulate on the broker pending transaction completion to indicate
870 // delivery status
871 registerSync();
872 MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
873 ack.setTransactionId(session.getTransactionContext().getTransactionId());
874 session.syncSendPacket(ack);
875 }
876
877 private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
878 if (unconsumedMessages.isClosed()) {
879 return;
880 }
881 if (messageExpired) {
882 acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
883 stats.getExpiredMessageCount().increment();
884 } else {
885 stats.onMessage();
886 if (session.getTransacted()) {
887 // Do nothing.
888 } else if (isAutoAcknowledgeEach()) {
889 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
890 synchronized (deliveredMessages) {
891 if (!deliveredMessages.isEmpty()) {
892 if (optimizeAcknowledge) {
893 ackCounter++;
894
895 // AMQ-3956 evaluate both expired and normal msgs as
896 // otherwise consumer may get stalled
897 if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
898 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
899 if (ack != null) {
900 deliveredMessages.clear();
901 ackCounter = 0;
902 session.sendAck(ack);
903 optimizeAckTimestamp = System.currentTimeMillis();
904 }
905 // AMQ-3956 - as further optimization send
906 // ack for expired msgs when there are any.
907 // This resets the deliveredCounter to 0 so that
908 // we won't sent standard acks with every msg just
909 // because the deliveredCounter just below
910 // 0.5 * prefetch as used in ackLater()
911 if (pendingAck != null && deliveredCounter > 0) {
912 session.sendAck(pendingAck);
913 pendingAck = null;
914 deliveredCounter = 0;
915 }
916 }
917 } else {
918 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
919 if (ack!=null) {
920 deliveredMessages.clear();
921 session.sendAck(ack);
922 }
923 }
924 }
925 }
926 deliveryingAcknowledgements.set(false);
927 }
928 } else if (isAutoAcknowledgeBatch()) {
929 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
930 } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
931 boolean messageUnackedByConsumer = false;
932 synchronized (deliveredMessages) {
933 messageUnackedByConsumer = deliveredMessages.contains(md);
934 }
935 if (messageUnackedByConsumer) {
936 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
937 }
938 }
939 else {
940 throw new IllegalStateException("Invalid session state.");
941 }
942 }
943 }
944
945 /**
946 * Creates a MessageAck for all messages contained in deliveredMessages.
947 * Caller should hold the lock for deliveredMessages.
948 *
949 * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
950 * @return <code>null</code> if nothing to ack.
951 */
952 private MessageAck makeAckForAllDeliveredMessages(byte type) {
953 synchronized (deliveredMessages) {
954 if (deliveredMessages.isEmpty())
955 return null;
956
957 MessageDispatch md = deliveredMessages.getFirst();
958 MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
959 ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
960 return ack;
961 }
962 }
963
964 private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
965
966 // Don't acknowledge now, but we may need to let the broker know the
967 // consumer got the message to expand the pre-fetch window
968 if (session.getTransacted()) {
969 registerSync();
970 }
971
972 deliveredCounter++;
973
974 MessageAck oldPendingAck = pendingAck;
975 pendingAck = new MessageAck(md, ackType, deliveredCounter);
976 pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
977 if( oldPendingAck==null ) {
978 pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
979 } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
980 pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
981 } else {
982 // old pending ack being superseded by ack of another type, if is is not a delivered
983 // ack and hence important, send it now so it is not lost.
984 if ( !oldPendingAck.isDeliveredAck()) {
985 if (LOG.isDebugEnabled()) {
986 LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
987 }
988 session.sendAck(oldPendingAck);
989 } else {
990 if (LOG.isDebugEnabled()) {
991 LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
992 }
993 }
994 }
995 // AMQ-3956 evaluate both expired and normal msgs as
996 // otherwise consumer may get stalled
997 if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
998 session.sendAck(pendingAck);
999 pendingAck=null;
1000 deliveredCounter = 0;
1001 additionalWindowSize = 0;
1002 }
1003 }
1004
1005 private void registerSync() throws JMSException {
1006 session.doStartTransaction();
1007 if (!synchronizationRegistered) {
1008 synchronizationRegistered = true;
1009 session.getTransactionContext().addSynchronization(new Synchronization() {
1010 @Override
1011 public void beforeEnd() throws Exception {
1012 if (transactedIndividualAck) {
1013 clearDispatchList();
1014 waitForRedeliveries();
1015 synchronized(deliveredMessages) {
1016 rollbackOnFailedRecoveryRedelivery();
1017 }
1018 } else {
1019 acknowledge();
1020 }
1021 synchronizationRegistered = false;
1022 }
1023
1024 @Override
1025 public void afterCommit() throws Exception {
1026 commit();
1027 synchronizationRegistered = false;
1028 }
1029
1030 @Override
1031 public void afterRollback() throws Exception {
1032 rollback();
1033 synchronizationRegistered = false;
1034 }
1035 });
1036 }
1037 }
1038
1039 /**
1040 * Acknowledge all the messages that have been delivered to the client up to
1041 * this point.
1042 *
1043 * @throws JMSException
1044 */
1045 public void acknowledge() throws JMSException {
1046 clearDispatchList();
1047 waitForRedeliveries();
1048 synchronized(deliveredMessages) {
1049 // Acknowledge all messages so far.
1050 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
1051 if (ack == null)
1052 return; // no msgs
1053
1054 if (session.getTransacted()) {
1055 rollbackOnFailedRecoveryRedelivery();
1056 session.doStartTransaction();
1057 ack.setTransactionId(session.getTransactionContext().getTransactionId());
1058 }
1059
1060 pendingAck = null;
1061 session.sendAck(ack);
1062
1063 // Adjust the counters
1064 deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1065 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1066
1067 if (!session.getTransacted()) {
1068 deliveredMessages.clear();
1069 }
1070 }
1071 }
1072
1073 private void waitForRedeliveries() {
1074 if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1075 long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1076 int numberNotReplayed;
1077 do {
1078 numberNotReplayed = 0;
1079 synchronized(deliveredMessages) {
1080 if (previouslyDeliveredMessages != null) {
1081 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1082 if (!entry.getValue()) {
1083 numberNotReplayed++;
1084 }
1085 }
1086 }
1087 }
1088 if (numberNotReplayed > 0) {
1089 LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
1090 + previouslyDeliveredMessages.transactionId + ", to consumer :" + this.getConsumerId());
1091 try {
1092 Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1093 } catch (InterruptedException outOfhere) {
1094 break;
1095 }
1096 }
1097 } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1098 }
1099 }
1100
1101 /*
1102 * called with deliveredMessages locked
1103 */
1104 private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1105 if (previouslyDeliveredMessages != null) {
1106 // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1107 // as messages have been dispatched else where.
1108 int numberNotReplayed = 0;
1109 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1110 if (!entry.getValue()) {
1111 numberNotReplayed++;
1112 if (LOG.isDebugEnabled()) {
1113 LOG.debug("previously delivered message has not been replayed in transaction: "
1114 + previouslyDeliveredMessages.transactionId
1115 + " , messageId: " + entry.getKey());
1116 }
1117 }
1118 }
1119 if (numberNotReplayed > 0) {
1120 String message = "rolling back transaction ("
1121 + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1122 + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1123 LOG.warn(message);
1124 throw new TransactionRolledBackException(message);
1125 }
1126 }
1127 }
1128
1129 void acknowledge(MessageDispatch md) throws JMSException {
1130 acknowledge(md, MessageAck.INDIVIDUAL_ACK_TYPE);
1131 }
1132
1133 void acknowledge(MessageDispatch md, byte ackType) throws JMSException {
1134 MessageAck ack = new MessageAck(md, ackType, 1);
1135 session.sendAck(ack);
1136 synchronized(deliveredMessages){
1137 deliveredMessages.remove(md);
1138 }
1139 }
1140
1141 public void commit() throws JMSException {
1142 synchronized (deliveredMessages) {
1143 deliveredMessages.clear();
1144 clearPreviouslyDelivered();
1145 }
1146 redeliveryDelay = 0;
1147 }
1148
1149 public void rollback() throws JMSException {
1150 synchronized (unconsumedMessages.getMutex()) {
1151 if (optimizeAcknowledge) {
1152 // remove messages read but not acked at the broker yet through
1153 // optimizeAcknowledge
1154 if (!this.info.isBrowser()) {
1155 synchronized(deliveredMessages) {
1156 for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1157 // ensure we don't filter this as a duplicate
1158 MessageDispatch md = deliveredMessages.removeLast();
1159 session.connection.rollbackDuplicate(this, md.getMessage());
1160 }
1161 }
1162 }
1163 }
1164 synchronized(deliveredMessages) {
1165 rollbackPreviouslyDeliveredAndNotRedelivered();
1166 if (deliveredMessages.isEmpty()) {
1167 return;
1168 }
1169
1170 // use initial delay for first redelivery
1171 MessageDispatch lastMd = deliveredMessages.getFirst();
1172 final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1173 if (currentRedeliveryCount > 0) {
1174 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1175 } else {
1176 redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1177 }
1178 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1179
1180 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1181 MessageDispatch md = iter.next();
1182 md.getMessage().onMessageRolledBack();
1183 // ensure we don't filter this as a duplicate
1184 session.connection.rollbackDuplicate(this, md.getMessage());
1185 }
1186
1187 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1188 && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1189 // We need to NACK the messages so that they get sent to the
1190 // DLQ.
1191 // Acknowledge the last message.
1192
1193 MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1194 ack.setPoisonCause(lastMd.getRollbackCause());
1195 ack.setFirstMessageId(firstMsgId);
1196 session.sendAck(ack,true);
1197 // Adjust the window size.
1198 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1199 redeliveryDelay = 0;
1200 } else {
1201
1202 // only redelivery_ack after first delivery
1203 if (currentRedeliveryCount > 0) {
1204 MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1205 ack.setFirstMessageId(firstMsgId);
1206 session.sendAck(ack,true);
1207 }
1208
1209 // stop the delivery of messages.
1210 if (nonBlockingRedelivery) {
1211 if (!unconsumedMessages.isClosed()) {
1212
1213 final LinkedList<MessageDispatch> pendingRedeliveries =
1214 new LinkedList<MessageDispatch>(deliveredMessages);
1215
1216 // Start up the delivery again a little later.
1217 session.getScheduler().executeAfterDelay(new Runnable() {
1218 public void run() {
1219 try {
1220 if (!unconsumedMessages.isClosed()) {
1221 for(MessageDispatch dispatch : pendingRedeliveries) {
1222 session.dispatch(dispatch);
1223 }
1224 }
1225 } catch (Exception e) {
1226 session.connection.onAsyncException(e);
1227 }
1228 }
1229 }, redeliveryDelay);
1230 }
1231
1232 } else {
1233 unconsumedMessages.stop();
1234
1235 for (MessageDispatch md : deliveredMessages) {
1236 unconsumedMessages.enqueueFirst(md);
1237 }
1238
1239 if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1240 // Start up the delivery again a little later.
1241 session.getScheduler().executeAfterDelay(new Runnable() {
1242 public void run() {
1243 try {
1244 if (started.get()) {
1245 start();
1246 }
1247 } catch (JMSException e) {
1248 session.connection.onAsyncException(e);
1249 }
1250 }
1251 }, redeliveryDelay);
1252 } else {
1253 start();
1254 }
1255 }
1256 }
1257 deliveredCounter -= deliveredMessages.size();
1258 deliveredMessages.clear();
1259 }
1260 }
1261 if (messageListener.get() != null) {
1262 session.redispatch(this, unconsumedMessages);
1263 }
1264 }
1265
1266 /*
1267 * called with unconsumedMessages && deliveredMessages locked
1268 * remove any message not re-delivered as they can't be replayed to this
1269 * consumer on rollback
1270 */
1271 private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1272 if (previouslyDeliveredMessages != null) {
1273 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1274 if (!entry.getValue()) {
1275 removeFromDeliveredMessages(entry.getKey());
1276 }
1277 }
1278 clearPreviouslyDelivered();
1279 }
1280 }
1281
1282 /*
1283 * called with deliveredMessages locked
1284 */
1285 private void removeFromDeliveredMessages(MessageId key) {
1286 Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1287 while (iterator.hasNext()) {
1288 MessageDispatch candidate = iterator.next();
1289 if (key.equals(candidate.getMessage().getMessageId())) {
1290 session.connection.rollbackDuplicate(this, candidate.getMessage());
1291 iterator.remove();
1292 break;
1293 }
1294 }
1295 }
1296
1297 /*
1298 * called with deliveredMessages locked
1299 */
1300 private void clearPreviouslyDelivered() {
1301 if (previouslyDeliveredMessages != null) {
1302 previouslyDeliveredMessages.clear();
1303 previouslyDeliveredMessages = null;
1304 }
1305 }
1306
1307 public void dispatch(MessageDispatch md) {
1308 MessageListener listener = this.messageListener.get();
1309 try {
1310 clearMessagesInProgress();
1311 clearDispatchList();
1312 synchronized (unconsumedMessages.getMutex()) {
1313 if (!unconsumedMessages.isClosed()) {
1314 if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1315 if (listener != null && unconsumedMessages.isRunning()) {
1316 ActiveMQMessage message = createActiveMQMessage(md);
1317 beforeMessageIsConsumed(md);
1318 try {
1319 boolean expired = message.isExpired();
1320 if (!expired) {
1321 listener.onMessage(message);
1322 }
1323 afterMessageIsConsumed(md, expired);
1324 } catch (RuntimeException e) {
1325 LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
1326 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1327 // schedual redelivery and possible dlq processing
1328 md.setRollbackCause(e);
1329 rollback();
1330 } else {
1331 // Transacted or Client ack: Deliver the
1332 // next message.
1333 afterMessageIsConsumed(md, false);
1334 }
1335 }
1336 } else {
1337 if (!unconsumedMessages.isRunning()) {
1338 // delayed redelivery, ensure it can be re delivered
1339 session.connection.rollbackDuplicate(this, md.getMessage());
1340 }
1341 unconsumedMessages.enqueue(md);
1342 if (availableListener != null) {
1343 availableListener.onMessageAvailable(this);
1344 }
1345 }
1346 } else {
1347 if (!session.isTransacted()) {
1348 LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId()
1349 + " to consumer: " + getConsumerId() + ", ignoring (auto acking) duplicate: " + md);
1350 MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
1351 session.sendAck(ack);
1352 } else {
1353 if (LOG.isDebugEnabled()) {
1354 LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
1355 }
1356 boolean needsPoisonAck = false;
1357 synchronized (deliveredMessages) {
1358 if (previouslyDeliveredMessages != null) {
1359 previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1360 } else {
1361 // delivery while pending redelivery to another consumer on the same connection
1362 // not waiting for redelivery will help here
1363 needsPoisonAck = true;
1364 }
1365 }
1366 if (needsPoisonAck) {
1367 MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
1368 poisonAck.setFirstMessageId(md.getMessage().getMessageId());
1369 poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
1370 + session.getConnection().getConnectionInfo().getConnectionId()));
1371 LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
1372 + " consumer on this connection, failoverRedeliveryWaitPeriod="
1373 + failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck);
1374 session.sendAck(poisonAck);
1375 } else {
1376 if (transactedIndividualAck) {
1377 immediateIndividualTransactedAck(md);
1378 } else {
1379 session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
1380 }
1381 }
1382 }
1383 }
1384 }
1385 }
1386 if (++dispatchedCount % 1000 == 0) {
1387 dispatchedCount = 0;
1388 Thread.yield();
1389 }
1390 } catch (Exception e) {
1391 session.connection.onClientInternalException(e);
1392 }
1393 }
1394
1395 // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1396 private void clearDispatchList() {
1397 if (clearDispatchList) {
1398 synchronized (deliveredMessages) {
1399 if (clearDispatchList) {
1400 if (!deliveredMessages.isEmpty()) {
1401 if (session.isTransacted()) {
1402 if (LOG.isDebugEnabled()) {
1403 LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1404 }
1405 if (previouslyDeliveredMessages == null) {
1406 previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1407 }
1408 for (MessageDispatch delivered : deliveredMessages) {
1409 previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1410 }
1411 } else {
1412 if (LOG.isDebugEnabled()) {
1413 LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1414 }
1415 deliveredMessages.clear();
1416 pendingAck = null;
1417 }
1418 }
1419 clearDispatchList = false;
1420 }
1421 }
1422 }
1423 }
1424
1425 public int getMessageSize() {
1426 return unconsumedMessages.size();
1427 }
1428
1429 public void start() throws JMSException {
1430 if (unconsumedMessages.isClosed()) {
1431 return;
1432 }
1433 started.set(true);
1434 unconsumedMessages.start();
1435 session.executor.wakeup();
1436 }
1437
1438 public void stop() {
1439 started.set(false);
1440 unconsumedMessages.stop();
1441 }
1442
1443 @Override
1444 public String toString() {
1445 return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1446 + " }";
1447 }
1448
1449 /**
1450 * Delivers a message to the message listener.
1451 *
1452 * @return
1453 * @throws JMSException
1454 */
1455 public boolean iterate() {
1456 MessageListener listener = this.messageListener.get();
1457 if (listener != null) {
1458 MessageDispatch md = unconsumedMessages.dequeueNoWait();
1459 if (md != null) {
1460 dispatch(md);
1461 return true;
1462 }
1463 }
1464 return false;
1465 }
1466
1467 public boolean isInUse(ActiveMQTempDestination destination) {
1468 return info.getDestination().equals(destination);
1469 }
1470
1471 public long getLastDeliveredSequenceId() {
1472 return lastDeliveredSequenceId;
1473 }
1474
1475 public IOException getFailureError() {
1476 return failureError;
1477 }
1478
1479 public void setFailureError(IOException failureError) {
1480 this.failureError = failureError;
1481 }
1482
1483 /**
1484 * @return the optimizedAckScheduledAckInterval
1485 */
1486 public long getOptimizedAckScheduledAckInterval() {
1487 return optimizedAckScheduledAckInterval;
1488 }
1489
1490 /**
1491 * @param optimizedAckScheduledAckInterval the optimizedAckScheduledAckInterval to set
1492 */
1493 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) throws JMSException {
1494 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1495
1496 if (this.optimizedAckTask != null) {
1497 try {
1498 this.session.connection.getScheduler().cancel(optimizedAckTask);
1499 } catch (JMSException e) {
1500 LOG.debug("Caught exception while cancelling old optimized ack task", e);
1501 throw e;
1502 }
1503 this.optimizedAckTask = null;
1504 }
1505
1506 // Should we periodically send out all outstanding acks.
1507 if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) {
1508 this.optimizedAckTask = new Runnable() {
1509
1510 @Override
1511 public void run() {
1512 try {
1513 if (optimizeAcknowledge && !unconsumedMessages.isClosed()) {
1514 if (LOG.isInfoEnabled()) {
1515 LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
1516 }
1517 deliverAcks();
1518 }
1519 } catch (Exception e) {
1520 LOG.debug("Optimized Ack Task caught exception during ack", e);
1521 }
1522 }
1523 };
1524
1525 try {
1526 this.session.connection.getScheduler().executePeriodically(optimizedAckTask, optimizedAckScheduledAckInterval);
1527 } catch (JMSException e) {
1528 LOG.debug("Caught exception while scheduling new optimized ack task", e);
1529 throw e;
1530 }
1531 }
1532 }
1533 }