001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicReference;
033
034import javax.jms.IllegalStateException;
035import javax.jms.InvalidDestinationException;
036import javax.jms.JMSException;
037import javax.jms.Message;
038import javax.jms.MessageConsumer;
039import javax.jms.MessageListener;
040import javax.jms.TransactionRolledBackException;
041
042import org.apache.activemq.blob.BlobDownloader;
043import org.apache.activemq.command.ActiveMQBlobMessage;
044import org.apache.activemq.command.ActiveMQDestination;
045import org.apache.activemq.command.ActiveMQMessage;
046import org.apache.activemq.command.ActiveMQObjectMessage;
047import org.apache.activemq.command.ActiveMQTempDestination;
048import org.apache.activemq.command.CommandTypes;
049import org.apache.activemq.command.ConsumerId;
050import org.apache.activemq.command.ConsumerInfo;
051import org.apache.activemq.command.MessageAck;
052import org.apache.activemq.command.MessageDispatch;
053import org.apache.activemq.command.MessageId;
054import org.apache.activemq.command.MessagePull;
055import org.apache.activemq.command.RemoveInfo;
056import org.apache.activemq.command.TransactionId;
057import org.apache.activemq.management.JMSConsumerStatsImpl;
058import org.apache.activemq.management.StatsCapable;
059import org.apache.activemq.management.StatsImpl;
060import org.apache.activemq.selector.SelectorParser;
061import org.apache.activemq.transaction.Synchronization;
062import org.apache.activemq.util.Callback;
063import org.apache.activemq.util.IntrospectionSupport;
064import org.apache.activemq.util.JMSExceptionSupport;
065import org.apache.activemq.util.ThreadPoolUtils;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
071 * from a destination. A <CODE> MessageConsumer</CODE> object is created by
072 * passing a <CODE>Destination</CODE> object to a message-consumer creation
073 * method supplied by a session.
074 * <P>
075 * <CODE>MessageConsumer</CODE> is the parent interface for all message
076 * consumers.
077 * <P>
078 * A message consumer can be created with a message selector. A message selector
079 * allows the client to restrict the messages delivered to the message consumer
080 * to those that match the selector.
081 * <P>
082 * A client may either synchronously receive a message consumer's messages or
083 * have the consumer asynchronously deliver them as they arrive.
084 * <P>
085 * For synchronous receipt, a client can request the next message from a message
086 * consumer using one of its <CODE> receive</CODE> methods. There are several
087 * variations of <CODE>receive</CODE> that allow a client to poll or wait for
088 * the next message.
089 * <P>
090 * For asynchronous delivery, a client can register a
091 * <CODE>MessageListener</CODE> object with a message consumer. As messages
092 * arrive at the message consumer, it delivers them by calling the
093 * <CODE>MessageListener</CODE>'s<CODE>
094 * onMessage</CODE> method.
095 * <P>
096 * It is a client programming error for a <CODE>MessageListener</CODE> to
097 * throw an exception.
098 *
099 *
100 * @see javax.jms.MessageConsumer
101 * @see javax.jms.QueueReceiver
102 * @see javax.jms.TopicSubscriber
103 * @see javax.jms.Session
104 */
105public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
106
107    @SuppressWarnings("serial")
108    class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
109        final TransactionId transactionId;
110        public PreviouslyDeliveredMap(TransactionId transactionId) {
111            this.transactionId = transactionId;
112        }
113    }
114
115    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
116    protected final ActiveMQSession session;
117    protected final ConsumerInfo info;
118
119    // These are the messages waiting to be delivered to the client
120    protected final MessageDispatchChannel unconsumedMessages;
121
122    // The are the messages that were delivered to the consumer but that have
123    // not been acknowledged. It's kept in reverse order since we
124    // Always walk list in reverse order.
125    protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
126    // track duplicate deliveries in a transaction such that the tx integrity can be validated
127    private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
128    private int deliveredCounter;
129    private int additionalWindowSize;
130    private long redeliveryDelay;
131    private int ackCounter;
132    private int dispatchedCount;
133    private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
134    private final JMSConsumerStatsImpl stats;
135
136    private final String selector;
137    private boolean synchronizationRegistered;
138    private final AtomicBoolean started = new AtomicBoolean(false);
139
140    private MessageAvailableListener availableListener;
141
142    private RedeliveryPolicy redeliveryPolicy;
143    private boolean optimizeAcknowledge;
144    private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
145    private ExecutorService executorService;
146    private MessageTransformer transformer;
147    private boolean clearDeliveredList;
148    AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
149
150    private MessageAck pendingAck;
151    private long lastDeliveredSequenceId = -1;
152
153    private IOException failureError;
154
155    private long optimizeAckTimestamp = System.currentTimeMillis();
156    private long optimizeAcknowledgeTimeOut = 0;
157    private long optimizedAckScheduledAckInterval = 0;
158    private Runnable optimizedAckTask;
159    private long failoverRedeliveryWaitPeriod = 0;
160    private boolean transactedIndividualAck = false;
161    private boolean nonBlockingRedelivery = false;
162    private boolean consumerExpiryCheckEnabled = true;
163
164    /**
165     * Create a MessageConsumer
166     *
167     * @param session
168     * @param dest
169     * @param name
170     * @param selector
171     * @param prefetch
172     * @param maximumPendingMessageCount
173     * @param noLocal
174     * @param browser
175     * @param dispatchAsync
176     * @param messageListener
177     * @throws JMSException
178     */
179    public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
180            String name, String selector, int prefetch,
181            int maximumPendingMessageCount, boolean noLocal, boolean browser,
182            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
183        if (dest == null) {
184            throw new InvalidDestinationException("Don't understand null destinations");
185        } else if (dest.getPhysicalName() == null) {
186            throw new InvalidDestinationException("The destination object was not given a physical name.");
187        } else if (dest.isTemporary()) {
188            String physicalName = dest.getPhysicalName();
189
190            if (physicalName == null) {
191                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
192            }
193
194            String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
195
196            if (physicalName.indexOf(connectionID) < 0) {
197                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
198            }
199
200            if (session.connection.isDeleted(dest)) {
201                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
202            }
203            if (prefetch < 0) {
204                throw new JMSException("Cannot have a prefetch size less than zero");
205            }
206        }
207        if (session.connection.isMessagePrioritySupported()) {
208            this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
209        }else {
210            this.unconsumedMessages = new FifoMessageDispatchChannel();
211        }
212
213        this.session = session;
214        this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
215        if (this.redeliveryPolicy == null) {
216            this.redeliveryPolicy = new RedeliveryPolicy();
217        }
218        setTransformer(session.getTransformer());
219
220        this.info = new ConsumerInfo(consumerId);
221        this.info.setExclusive(this.session.connection.isExclusiveConsumer());
222        this.info.setClientId(this.session.connection.getClientID());
223        this.info.setSubscriptionName(name);
224        this.info.setPrefetchSize(prefetch);
225        this.info.setCurrentPrefetchSize(prefetch);
226        this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
227        this.info.setNoLocal(noLocal);
228        this.info.setDispatchAsync(dispatchAsync);
229        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
230        this.info.setSelector(null);
231
232        // Allows the options on the destination to configure the consumerInfo
233        if (dest.getOptions() != null) {
234            Map<String, Object> options = IntrospectionSupport.extractProperties(
235                new HashMap<String, Object>(dest.getOptions()), "consumer.");
236            IntrospectionSupport.setProperties(this.info, options);
237            if (options.size() > 0) {
238                String msg = "There are " + options.size()
239                    + " consumer options that couldn't be set on the consumer."
240                    + " Check the options are spelled correctly."
241                    + " Unknown parameters=[" + options + "]."
242                    + " This consumer cannot be started.";
243                LOG.warn(msg);
244                throw new ConfigurationException(msg);
245            }
246        }
247
248        this.info.setDestination(dest);
249        this.info.setBrowser(browser);
250        if (selector != null && selector.trim().length() != 0) {
251            // Validate the selector
252            SelectorParser.parse(selector);
253            this.info.setSelector(selector);
254            this.selector = selector;
255        } else if (info.getSelector() != null) {
256            // Validate the selector
257            SelectorParser.parse(this.info.getSelector());
258            this.selector = this.info.getSelector();
259        } else {
260            this.selector = null;
261        }
262
263        this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
264        this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
265                                   && !info.isBrowser();
266        if (this.optimizeAcknowledge) {
267            this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
268            setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
269        }
270
271        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
272        this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
273        this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
274        this.transactedIndividualAck = session.connection.isTransactedIndividualAck()
275                        || this.nonBlockingRedelivery
276                        || session.connection.isMessagePrioritySupported();
277        this.consumerExpiryCheckEnabled = session.connection.isConsumerExpiryCheckEnabled();
278        if (messageListener != null) {
279            setMessageListener(messageListener);
280        }
281        try {
282            this.session.addConsumer(this);
283            this.session.syncSendPacket(info);
284        } catch (JMSException e) {
285            this.session.removeConsumer(this);
286            throw e;
287        }
288
289        if (session.connection.isStarted()) {
290            start();
291        }
292    }
293
294    private boolean isAutoAcknowledgeEach() {
295        return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
296    }
297
298    private boolean isAutoAcknowledgeBatch() {
299        return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
300    }
301
302    @Override
303    public StatsImpl getStats() {
304        return stats;
305    }
306
307    public JMSConsumerStatsImpl getConsumerStats() {
308        return stats;
309    }
310
311    public RedeliveryPolicy getRedeliveryPolicy() {
312        return redeliveryPolicy;
313    }
314
315    /**
316     * Sets the redelivery policy used when messages are redelivered
317     */
318    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
319        this.redeliveryPolicy = redeliveryPolicy;
320    }
321
322    public MessageTransformer getTransformer() {
323        return transformer;
324    }
325
326    /**
327     * Sets the transformer used to transform messages before they are sent on
328     * to the JMS bus
329     */
330    public void setTransformer(MessageTransformer transformer) {
331        this.transformer = transformer;
332    }
333
334    /**
335     * @return Returns the value.
336     */
337    public ConsumerId getConsumerId() {
338        return info.getConsumerId();
339    }
340
341    /**
342     * @return the consumer name - used for durable consumers
343     */
344    public String getConsumerName() {
345        return this.info.getSubscriptionName();
346    }
347
348    /**
349     * @return true if this consumer does not accept locally produced messages
350     */
351    protected boolean isNoLocal() {
352        return info.isNoLocal();
353    }
354
355    /**
356     * Retrieve is a browser
357     *
358     * @return true if a browser
359     */
360    protected boolean isBrowser() {
361        return info.isBrowser();
362    }
363
364    /**
365     * @return ActiveMQDestination
366     */
367    protected ActiveMQDestination getDestination() {
368        return info.getDestination();
369    }
370
371    /**
372     * @return Returns the prefetchNumber.
373     */
374    public int getPrefetchNumber() {
375        return info.getPrefetchSize();
376    }
377
378    /**
379     * @return true if this is a durable topic subscriber
380     */
381    public boolean isDurableSubscriber() {
382        return info.getSubscriptionName() != null && info.getDestination().isTopic();
383    }
384
385    /**
386     * Gets this message consumer's message selector expression.
387     *
388     * @return this message consumer's message selector, or null if no message
389     *         selector exists for the message consumer (that is, if the message
390     *         selector was not set or was set to null or the empty string)
391     * @throws JMSException if the JMS provider fails to receive the next
392     *                 message due to some internal error.
393     */
394    @Override
395    public String getMessageSelector() throws JMSException {
396        checkClosed();
397        return selector;
398    }
399
400    /**
401     * Gets the message consumer's <CODE>MessageListener</CODE>.
402     *
403     * @return the listener for the message consumer, or null if no listener is
404     *         set
405     * @throws JMSException if the JMS provider fails to get the message
406     *                 listener due to some internal error.
407     * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
408     */
409    @Override
410    public MessageListener getMessageListener() throws JMSException {
411        checkClosed();
412        return this.messageListener.get();
413    }
414
415    /**
416     * Sets the message consumer's <CODE>MessageListener</CODE>.
417     * <P>
418     * Setting the message listener to null is the equivalent of unsetting the
419     * message listener for the message consumer.
420     * <P>
421     * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
422     * while messages are being consumed by an existing listener or the consumer
423     * is being used to consume messages synchronously is undefined.
424     *
425     * @param listener the listener to which the messages are to be delivered
426     * @throws JMSException if the JMS provider fails to receive the next
427     *                 message due to some internal error.
428     * @see javax.jms.MessageConsumer#getMessageListener
429     */
430    @Override
431    public void setMessageListener(MessageListener listener) throws JMSException {
432        checkClosed();
433        if (info.getPrefetchSize() == 0) {
434            throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
435        }
436        if (listener != null) {
437            boolean wasRunning = session.isRunning();
438            if (wasRunning) {
439                session.stop();
440            }
441
442            this.messageListener.set(listener);
443            session.redispatch(this, unconsumedMessages);
444
445            if (wasRunning) {
446                session.start();
447            }
448        } else {
449            this.messageListener.set(null);
450        }
451    }
452
453    @Override
454    public MessageAvailableListener getAvailableListener() {
455        return availableListener;
456    }
457
458    /**
459     * Sets the listener used to notify synchronous consumers that there is a
460     * message available so that the {@link MessageConsumer#receiveNoWait()} can
461     * be called.
462     */
463    @Override
464    public void setAvailableListener(MessageAvailableListener availableListener) {
465        this.availableListener = availableListener;
466    }
467
468    /**
469     * Used to get an enqueued message from the unconsumedMessages list. The
470     * amount of time this method blocks is based on the timeout value. - if
471     * timeout==-1 then it blocks until a message is received. - if timeout==0
472     * then it it tries to not block at all, it returns a message if it is
473     * available - if timeout>0 then it blocks up to timeout amount of time.
474     * Expired messages will consumed by this method.
475     *
476     * @throws JMSException
477     * @return null if we timeout or if the consumer is closed.
478     */
479    private MessageDispatch dequeue(long timeout) throws JMSException {
480        try {
481            long deadline = 0;
482            if (timeout > 0) {
483                deadline = System.currentTimeMillis() + timeout;
484            }
485            while (true) {
486                MessageDispatch md = unconsumedMessages.dequeue(timeout);
487                if (md == null) {
488                    if (timeout > 0 && !unconsumedMessages.isClosed()) {
489                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
490                    } else {
491                        if (failureError != null) {
492                            throw JMSExceptionSupport.create(failureError);
493                        } else {
494                            return null;
495                        }
496                    }
497                } else if (md.getMessage() == null) {
498                    return null;
499                } else if (consumeExpiredMessage(md)) {
500                    LOG.debug("{} received expired message: {}", getConsumerId(), md);
501                    beforeMessageIsConsumed(md);
502                    afterMessageIsConsumed(md, true);
503                    if (timeout > 0) {
504                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
505                    }
506                    sendPullCommand(timeout);
507                } else if (redeliveryExceeded(md)) {
508                    LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
509                    posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
510                    if (timeout > 0) {
511                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
512                    }
513                    sendPullCommand(timeout);
514                } else {
515                    if (LOG.isTraceEnabled()) {
516                        LOG.trace(getConsumerId() + " received message: " + md);
517                    }
518                    return md;
519                }
520            }
521        } catch (InterruptedException e) {
522            Thread.currentThread().interrupt();
523            throw JMSExceptionSupport.create(e);
524        }
525    }
526
527    private boolean consumeExpiredMessage(MessageDispatch dispatch) {
528        if (dispatch.getMessage().isExpired()) {
529            return !isBrowser() && isConsumerExpiryCheckEnabled();
530        }
531
532        return false;
533    }
534
535    private void posionAck(MessageDispatch md, String cause) throws JMSException {
536        MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
537        posionAck.setFirstMessageId(md.getMessage().getMessageId());
538        posionAck.setPoisonCause(new Throwable(cause));
539        session.sendAck(posionAck);
540    }
541
542    private boolean redeliveryExceeded(MessageDispatch md) {
543        try {
544            return session.getTransacted()
545                    && redeliveryPolicy != null
546                    && redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
547                    && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
548                    // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
549                    && md.getMessage().getProperty("redeliveryDelay") == null;
550        } catch (Exception ignored) {
551            return false;
552        }
553    }
554
555    /**
556     * Receives the next message produced for this message consumer.
557     * <P>
558     * This call blocks indefinitely until a message is produced or until this
559     * message consumer is closed.
560     * <P>
561     * If this <CODE>receive</CODE> is done within a transaction, the consumer
562     * retains the message until the transaction commits.
563     *
564     * @return the next message produced for this message consumer, or null if
565     *         this message consumer is concurrently closed
566     */
567    @Override
568    public Message receive() throws JMSException {
569        checkClosed();
570        checkMessageListener();
571
572        sendPullCommand(0);
573        MessageDispatch md = dequeue(-1);
574        if (md == null) {
575            return null;
576        }
577
578        beforeMessageIsConsumed(md);
579        afterMessageIsConsumed(md, false);
580
581        return createActiveMQMessage(md);
582    }
583
584    /**
585     * @param md
586     *      the MessageDispatch that arrived from the Broker.
587     *
588     * @return an ActiveMQMessage initialized from the Message in the dispatch.
589     */
590    private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
591        ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
592        if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
593            ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
594        }
595        if (m.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
596            ((ActiveMQObjectMessage)m).setTrustAllPackages(session.getConnection().isTrustAllPackages());
597            ((ActiveMQObjectMessage)m).setTrustedPackages(session.getConnection().getTrustedPackages());
598        }
599        if (transformer != null) {
600            Message transformedMessage = transformer.consumerTransform(session, this, m);
601            if (transformedMessage != null) {
602                m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
603            }
604        }
605        if (session.isClientAcknowledge()) {
606            m.setAcknowledgeCallback(new Callback() {
607                @Override
608                public void execute() throws Exception {
609                    session.checkClosed();
610                    session.acknowledge();
611                }
612            });
613        } else if (session.isIndividualAcknowledge()) {
614            m.setAcknowledgeCallback(new Callback() {
615                @Override
616                public void execute() throws Exception {
617                    session.checkClosed();
618                    acknowledge(md);
619                }
620            });
621        }
622        return m;
623    }
624
625    /**
626     * Receives the next message that arrives within the specified timeout
627     * interval.
628     * <P>
629     * This call blocks until a message arrives, the timeout expires, or this
630     * message consumer is closed. A <CODE>timeout</CODE> of zero never
631     * expires, and the call blocks indefinitely.
632     *
633     * @param timeout the timeout value (in milliseconds), a time out of zero
634     *                never expires.
635     * @return the next message produced for this message consumer, or null if
636     *         the timeout expires or this message consumer is concurrently
637     *         closed
638     */
639    @Override
640    public Message receive(long timeout) throws JMSException {
641        checkClosed();
642        checkMessageListener();
643        if (timeout == 0) {
644            return this.receive();
645        }
646
647        sendPullCommand(timeout);
648        while (timeout > 0) {
649
650            MessageDispatch md;
651            if (info.getPrefetchSize() == 0) {
652                md = dequeue(-1); // We let the broker let us know when we timeout.
653            } else {
654                md = dequeue(timeout);
655            }
656
657            if (md == null) {
658                return null;
659            }
660
661            beforeMessageIsConsumed(md);
662            afterMessageIsConsumed(md, false);
663            return createActiveMQMessage(md);
664        }
665        return null;
666    }
667
668    /**
669     * Receives the next message if one is immediately available.
670     *
671     * @return the next message produced for this message consumer, or null if
672     *         one is not available
673     * @throws JMSException if the JMS provider fails to receive the next
674     *                 message due to some internal error.
675     */
676    @Override
677    public Message receiveNoWait() throws JMSException {
678        checkClosed();
679        checkMessageListener();
680        sendPullCommand(-1);
681
682        MessageDispatch md;
683        if (info.getPrefetchSize() == 0) {
684            md = dequeue(-1); // We let the broker let us know when we
685            // timeout.
686        } else {
687            md = dequeue(0);
688        }
689
690        if (md == null) {
691            return null;
692        }
693
694        beforeMessageIsConsumed(md);
695        afterMessageIsConsumed(md, false);
696        return createActiveMQMessage(md);
697    }
698
699    /**
700     * Closes the message consumer.
701     * <P>
702     * Since a provider may allocate some resources on behalf of a <CODE>
703     * MessageConsumer</CODE>
704     * outside the Java virtual machine, clients should close them when they are
705     * not needed. Relying on garbage collection to eventually reclaim these
706     * resources may not be timely enough.
707     * <P>
708     * This call blocks until a <CODE>receive</CODE> or message listener in
709     * progress has completed. A blocked message consumer <CODE>receive </CODE>
710     * call returns null when this message consumer is closed.
711     *
712     * @throws JMSException if the JMS provider fails to close the consumer due
713     *                 to some internal error.
714     */
715    @Override
716    public void close() throws JMSException {
717        if (!unconsumedMessages.isClosed()) {
718            if (!deliveredMessages.isEmpty() && session.getTransactionContext().isInTransaction()) {
719                session.getTransactionContext().addSynchronization(new Synchronization() {
720                    @Override
721                    public void afterCommit() throws Exception {
722                        doClose();
723                    }
724
725                    @Override
726                    public void afterRollback() throws Exception {
727                        doClose();
728                    }
729                });
730            } else {
731                doClose();
732            }
733        }
734    }
735
736    void doClose() throws JMSException {
737        dispose();
738        RemoveInfo removeCommand = info.createRemoveCommand();
739        LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId);
740        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
741        this.session.asyncSendPacket(removeCommand);
742    }
743
744    void inProgressClearRequired() {
745        inProgressClearRequiredFlag.incrementAndGet();
746        // deal with delivered messages async to avoid lock contention with in progress acks
747        clearDeliveredList = true;
748        // force a rollback if we will be acking in a transaction after/during failover
749        // bc acks are async they may not get there reliably on reconnect and the consumer
750        // may not be aware of the reconnect in a timely fashion if in onMessage
751        if (!deliveredMessages.isEmpty() && session.getTransactionContext().isInTransaction()) {
752            session.getTransactionContext().setRollbackOnly(true);
753        }
754    }
755
756    void clearMessagesInProgress() {
757        if (inProgressClearRequiredFlag.get() > 0) {
758            synchronized (unconsumedMessages.getMutex()) {
759                if (inProgressClearRequiredFlag.get() > 0) {
760                    LOG.debug("{} clearing unconsumed list ({}) on transport interrupt", getConsumerId(), unconsumedMessages.size());
761                    // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
762                    List<MessageDispatch> list = unconsumedMessages.removeAll();
763                    if (!this.info.isBrowser()) {
764                        for (MessageDispatch old : list) {
765                            session.connection.rollbackDuplicate(this, old.getMessage());
766                        }
767                    }
768                    // allow dispatch on this connection to resume
769                    session.connection.transportInterruptionProcessingComplete();
770                    inProgressClearRequiredFlag.decrementAndGet();
771
772                    // Wake up any blockers and allow them to recheck state.
773                    unconsumedMessages.getMutex().notifyAll();
774                }
775            }
776        }
777        clearDeliveredList();
778    }
779
780    void deliverAcks() {
781        MessageAck ack = null;
782        if (deliveryingAcknowledgements.compareAndSet(false, true)) {
783            if (isAutoAcknowledgeEach()) {
784                synchronized(deliveredMessages) {
785                    ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
786                    if (ack != null) {
787                        deliveredMessages.clear();
788                        ackCounter = 0;
789                    } else {
790                        ack = pendingAck;
791                        pendingAck = null;
792                    }
793                }
794            } else if (pendingAck != null && pendingAck.isStandardAck()) {
795                ack = pendingAck;
796                pendingAck = null;
797            }
798            if (ack != null) {
799                final MessageAck ackToSend = ack;
800
801                if (executorService == null) {
802                    executorService = Executors.newSingleThreadExecutor();
803                }
804                executorService.submit(new Runnable() {
805                    @Override
806                    public void run() {
807                        try {
808                            session.sendAck(ackToSend,true);
809                        } catch (JMSException e) {
810                            LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
811                        } finally {
812                            deliveryingAcknowledgements.set(false);
813                        }
814                    }
815                });
816            } else {
817                deliveryingAcknowledgements.set(false);
818            }
819        }
820    }
821
822    public void dispose() throws JMSException {
823        if (!unconsumedMessages.isClosed()) {
824
825            // Do we have any acks we need to send out before closing?
826            // Ack any delivered messages now.
827            if (!session.getTransacted()) {
828                deliverAcks();
829                if (isAutoAcknowledgeBatch()) {
830                    acknowledge();
831                }
832            }
833            if (executorService != null) {
834                ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
835                executorService = null;
836            }
837            if (optimizedAckTask != null) {
838                this.session.connection.getScheduler().cancel(optimizedAckTask);
839                optimizedAckTask = null;
840            }
841
842            if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
843                if (!this.info.isBrowser()) {
844                    // rollback duplicates that aren't acknowledged
845                    List<MessageDispatch> tmp = null;
846                    synchronized (this.deliveredMessages) {
847                        tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
848                    }
849                    for (MessageDispatch old : tmp) {
850                        this.session.connection.rollbackDuplicate(this, old.getMessage());
851                    }
852                    tmp.clear();
853                }
854            }
855            if (!session.isTransacted()) {
856                synchronized(deliveredMessages) {
857                    deliveredMessages.clear();
858                }
859            }
860            unconsumedMessages.close();
861            this.session.removeConsumer(this);
862            List<MessageDispatch> list = unconsumedMessages.removeAll();
863            if (!this.info.isBrowser()) {
864                for (MessageDispatch old : list) {
865                    // ensure we don't filter this as a duplicate
866                    LOG.debug("on close, rollback duplicate: {}", old.getMessage().getMessageId());
867                    session.connection.rollbackDuplicate(this, old.getMessage());
868                }
869            }
870        }
871    }
872
873    /**
874     * @throws IllegalStateException
875     */
876    protected void checkClosed() throws IllegalStateException {
877        if (unconsumedMessages.isClosed()) {
878            throw new IllegalStateException("The Consumer is closed");
879        }
880    }
881
882    /**
883     * If we have a zero prefetch specified then send a pull command to the
884     * broker to pull a message we are about to receive
885     */
886    protected void sendPullCommand(long timeout) throws JMSException {
887        clearDeliveredList();
888        if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
889            MessagePull messagePull = new MessagePull();
890            messagePull.configure(info);
891            messagePull.setTimeout(timeout);
892            session.asyncSendPacket(messagePull);
893        }
894    }
895
896    protected void checkMessageListener() throws JMSException {
897        session.checkMessageListener();
898    }
899
900    protected void setOptimizeAcknowledge(boolean value) {
901        if (optimizeAcknowledge && !value) {
902            deliverAcks();
903        }
904        optimizeAcknowledge = value;
905    }
906
907    protected void setPrefetchSize(int prefetch) {
908        deliverAcks();
909        this.info.setCurrentPrefetchSize(prefetch);
910    }
911
912    private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
913        md.setDeliverySequenceId(session.getNextDeliveryId());
914        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
915        if (!isAutoAcknowledgeBatch()) {
916            synchronized(deliveredMessages) {
917                deliveredMessages.addFirst(md);
918            }
919            if (session.getTransacted()) {
920                if (transactedIndividualAck) {
921                    immediateIndividualTransactedAck(md);
922                } else {
923                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
924                }
925            }
926        }
927    }
928
929    private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException {
930        // acks accumulate on the broker pending transaction completion to indicate
931        // delivery status
932        registerSync();
933        MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
934        ack.setTransactionId(session.getTransactionContext().getTransactionId());
935        session.sendAck(ack);
936    }
937
938    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
939        if (unconsumedMessages.isClosed()) {
940            return;
941        }
942        if (messageExpired) {
943            acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
944            stats.getExpiredMessageCount().increment();
945        } else {
946            stats.onMessage();
947            if (session.getTransacted()) {
948                // Do nothing.
949            } else if (isAutoAcknowledgeEach()) {
950                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
951                    synchronized (deliveredMessages) {
952                        if (!deliveredMessages.isEmpty()) {
953                            if (optimizeAcknowledge) {
954                                ackCounter++;
955
956                                // AMQ-3956 evaluate both expired and normal msgs as
957                                // otherwise consumer may get stalled
958                                if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
959                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
960                                    if (ack != null) {
961                                        deliveredMessages.clear();
962                                        ackCounter = 0;
963                                        session.sendAck(ack);
964                                        optimizeAckTimestamp = System.currentTimeMillis();
965                                    }
966                                    // AMQ-3956 - as further optimization send
967                                    // ack for expired msgs when there are any.
968                                    // This resets the deliveredCounter to 0 so that
969                                    // we won't sent standard acks with every msg just
970                                    // because the deliveredCounter just below
971                                    // 0.5 * prefetch as used in ackLater()
972                                    if (pendingAck != null && deliveredCounter > 0) {
973                                        session.sendAck(pendingAck);
974                                        pendingAck = null;
975                                        deliveredCounter = 0;
976                                    }
977                                }
978                            } else {
979                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
980                                if (ack!=null) {
981                                    deliveredMessages.clear();
982                                    session.sendAck(ack);
983                                }
984                            }
985                        }
986                    }
987                    deliveryingAcknowledgements.set(false);
988                }
989            } else if (isAutoAcknowledgeBatch()) {
990                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
991            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
992                boolean messageUnackedByConsumer = false;
993                synchronized (deliveredMessages) {
994                    messageUnackedByConsumer = deliveredMessages.contains(md);
995                }
996                if (messageUnackedByConsumer) {
997                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
998                }
999            }
1000            else {
1001                throw new IllegalStateException("Invalid session state.");
1002            }
1003        }
1004    }
1005
1006    /**
1007     * Creates a MessageAck for all messages contained in deliveredMessages.
1008     * Caller should hold the lock for deliveredMessages.
1009     *
1010     * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
1011     * @return <code>null</code> if nothing to ack.
1012     */
1013    private MessageAck makeAckForAllDeliveredMessages(byte type) {
1014        synchronized (deliveredMessages) {
1015            if (deliveredMessages.isEmpty()) {
1016                return null;
1017            }
1018
1019            MessageDispatch md = deliveredMessages.getFirst();
1020            MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
1021            ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
1022            return ack;
1023        }
1024    }
1025
1026    private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
1027
1028        // Don't acknowledge now, but we may need to let the broker know the
1029        // consumer got the message to expand the pre-fetch window
1030        if (session.getTransacted()) {
1031            registerSync();
1032        }
1033
1034        deliveredCounter++;
1035
1036        MessageAck oldPendingAck = pendingAck;
1037        pendingAck = new MessageAck(md, ackType, deliveredCounter);
1038        pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
1039        if( oldPendingAck==null ) {
1040            pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
1041        } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
1042            pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
1043        } else {
1044            // old pending ack being superseded by ack of another type, if is is not a delivered
1045            // ack and hence important, send it now so it is not lost.
1046            if (!oldPendingAck.isDeliveredAck()) {
1047                LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
1048                session.sendAck(oldPendingAck);
1049            } else {
1050                LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
1051            }
1052        }
1053        // AMQ-3956 evaluate both expired and normal msgs as
1054        // otherwise consumer may get stalled
1055        if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
1056            LOG.debug("ackLater: sending: {}", pendingAck);
1057            session.sendAck(pendingAck);
1058            pendingAck=null;
1059            deliveredCounter = 0;
1060            additionalWindowSize = 0;
1061        }
1062    }
1063
1064    private void registerSync() throws JMSException {
1065        session.doStartTransaction();
1066        if (!synchronizationRegistered) {
1067            synchronizationRegistered = true;
1068            session.getTransactionContext().addSynchronization(new Synchronization() {
1069                @Override
1070                public void beforeEnd() throws Exception {
1071                    if (transactedIndividualAck) {
1072                        clearDeliveredList();
1073                        waitForRedeliveries();
1074                        synchronized(deliveredMessages) {
1075                            rollbackOnFailedRecoveryRedelivery();
1076                        }
1077                    } else {
1078                        acknowledge();
1079                    }
1080                    synchronizationRegistered = false;
1081                }
1082
1083                @Override
1084                public void afterCommit() throws Exception {
1085                    commit();
1086                    synchronizationRegistered = false;
1087                }
1088
1089                @Override
1090                public void afterRollback() throws Exception {
1091                    rollback();
1092                    synchronizationRegistered = false;
1093                }
1094            });
1095        }
1096    }
1097
1098    /**
1099     * Acknowledge all the messages that have been delivered to the client up to
1100     * this point.
1101     *
1102     * @throws JMSException
1103     */
1104    public void acknowledge() throws JMSException {
1105        clearDeliveredList();
1106        waitForRedeliveries();
1107        synchronized(deliveredMessages) {
1108            // Acknowledge all messages so far.
1109            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
1110            if (ack == null) {
1111                return; // no msgs
1112            }
1113
1114            if (session.getTransacted()) {
1115                rollbackOnFailedRecoveryRedelivery();
1116                session.doStartTransaction();
1117                ack.setTransactionId(session.getTransactionContext().getTransactionId());
1118            }
1119
1120            pendingAck = null;
1121            session.sendAck(ack);
1122
1123            // Adjust the counters
1124            deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1125            additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1126
1127            if (!session.getTransacted()) {
1128                deliveredMessages.clear();
1129            }
1130        }
1131    }
1132
1133    private void waitForRedeliveries() {
1134        if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1135            long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1136            int numberNotReplayed;
1137            do {
1138                numberNotReplayed = 0;
1139                synchronized(deliveredMessages) {
1140                    if (previouslyDeliveredMessages != null) {
1141                        for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1142                            if (!entry.getValue()) {
1143                                numberNotReplayed++;
1144                            }
1145                        }
1146                    }
1147                }
1148                if (numberNotReplayed > 0) {
1149                    LOG.info("waiting for redelivery of {} in transaction: {}, to consumer: {}",
1150                             numberNotReplayed, this.getConsumerId(), previouslyDeliveredMessages.transactionId);
1151                    try {
1152                        Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1153                    } catch (InterruptedException outOfhere) {
1154                        break;
1155                    }
1156                }
1157            } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1158        }
1159    }
1160
1161    /*
1162     * called with deliveredMessages locked
1163     */
1164    private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1165        if (previouslyDeliveredMessages != null) {
1166            // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1167            // as messages have been dispatched else where.
1168            int numberNotReplayed = 0;
1169            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1170                if (!entry.getValue()) {
1171                    numberNotReplayed++;
1172                    LOG.debug("previously delivered message has not been replayed in transaction: {}, messageId: {}",
1173                              previouslyDeliveredMessages.transactionId, entry.getKey());
1174                }
1175            }
1176            if (numberNotReplayed > 0) {
1177                String message = "rolling back transaction ("
1178                    + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1179                    + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1180                LOG.warn(message);
1181                throw new TransactionRolledBackException(message);
1182            }
1183        }
1184    }
1185
1186    void acknowledge(MessageDispatch md) throws JMSException {
1187        acknowledge(md, MessageAck.INDIVIDUAL_ACK_TYPE);
1188    }
1189
1190    void acknowledge(MessageDispatch md, byte ackType) throws JMSException {
1191        MessageAck ack = new MessageAck(md, ackType, 1);
1192        if (ack.isExpiredAck()) {
1193            ack.setFirstMessageId(ack.getLastMessageId());
1194        }
1195        session.sendAck(ack);
1196        synchronized(deliveredMessages){
1197            deliveredMessages.remove(md);
1198        }
1199    }
1200
1201    public void commit() throws JMSException {
1202        synchronized (deliveredMessages) {
1203            deliveredMessages.clear();
1204            clearPreviouslyDelivered();
1205        }
1206        redeliveryDelay = 0;
1207    }
1208
1209    public void rollback() throws JMSException {
1210        clearDeliveredList();
1211        synchronized (unconsumedMessages.getMutex()) {
1212            if (optimizeAcknowledge) {
1213                // remove messages read but not acked at the broker yet through
1214                // optimizeAcknowledge
1215                if (!this.info.isBrowser()) {
1216                    synchronized(deliveredMessages) {
1217                        for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1218                            // ensure we don't filter this as a duplicate
1219                            MessageDispatch md = deliveredMessages.removeLast();
1220                            session.connection.rollbackDuplicate(this, md.getMessage());
1221                        }
1222                    }
1223                }
1224            }
1225            synchronized(deliveredMessages) {
1226                rollbackPreviouslyDeliveredAndNotRedelivered();
1227                if (deliveredMessages.isEmpty()) {
1228                    return;
1229                }
1230
1231                // use initial delay for first redelivery
1232                MessageDispatch lastMd = deliveredMessages.getFirst();
1233                final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1234                if (currentRedeliveryCount > 0) {
1235                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1236                } else {
1237                    redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1238                }
1239                MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1240
1241                for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1242                    MessageDispatch md = iter.next();
1243                    md.getMessage().onMessageRolledBack();
1244                    // ensure we don't filter this as a duplicate
1245                    session.connection.rollbackDuplicate(this, md.getMessage());
1246                }
1247
1248                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1249                    && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1250                    // We need to NACK the messages so that they get sent to the
1251                    // DLQ.
1252                    // Acknowledge the last message.
1253
1254                    MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1255                    ack.setFirstMessageId(firstMsgId);
1256                    ack.setPoisonCause(new Throwable("Exceeded redelivery policy limit:" + redeliveryPolicy
1257                            + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
1258                    session.sendAck(ack,true);
1259                    // Adjust the window size.
1260                    additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1261                    redeliveryDelay = 0;
1262
1263                    deliveredCounter -= deliveredMessages.size();
1264                    deliveredMessages.clear();
1265
1266                } else {
1267
1268                    // only redelivery_ack after first delivery
1269                    if (currentRedeliveryCount > 0) {
1270                        MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1271                        ack.setFirstMessageId(firstMsgId);
1272                        session.sendAck(ack,true);
1273                    }
1274
1275                    // stop the delivery of messages.
1276                    if (nonBlockingRedelivery) {
1277                        if (!unconsumedMessages.isClosed()) {
1278
1279                            final LinkedList<MessageDispatch> pendingRedeliveries =
1280                                new LinkedList<MessageDispatch>(deliveredMessages);
1281
1282                            Collections.reverse(pendingRedeliveries);
1283
1284                            deliveredCounter -= deliveredMessages.size();
1285                            deliveredMessages.clear();
1286
1287                            // Start up the delivery again a little later.
1288                            session.getScheduler().executeAfterDelay(new Runnable() {
1289                                @Override
1290                                public void run() {
1291                                    try {
1292                                        if (!unconsumedMessages.isClosed()) {
1293                                            for(MessageDispatch dispatch : pendingRedeliveries) {
1294                                                session.dispatch(dispatch);
1295                                            }
1296                                        }
1297                                    } catch (Exception e) {
1298                                        session.connection.onAsyncException(e);
1299                                    }
1300                                }
1301                            }, redeliveryDelay);
1302                        }
1303
1304                    } else {
1305                        unconsumedMessages.stop();
1306
1307                        for (MessageDispatch md : deliveredMessages) {
1308                            unconsumedMessages.enqueueFirst(md);
1309                        }
1310
1311                        deliveredCounter -= deliveredMessages.size();
1312                        deliveredMessages.clear();
1313
1314                        if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1315                            // Start up the delivery again a little later.
1316                            session.getScheduler().executeAfterDelay(new Runnable() {
1317                                @Override
1318                                public void run() {
1319                                    try {
1320                                        if (started.get()) {
1321                                            start();
1322                                        }
1323                                    } catch (JMSException e) {
1324                                        session.connection.onAsyncException(e);
1325                                    }
1326                                }
1327                            }, redeliveryDelay);
1328                        } else {
1329                            start();
1330                        }
1331                    }
1332                }
1333            }
1334        }
1335        if (messageListener.get() != null) {
1336            session.redispatch(this, unconsumedMessages);
1337        }
1338    }
1339
1340    /*
1341     * called with unconsumedMessages && deliveredMessages locked
1342     * remove any message not re-delivered as they can't be replayed to this
1343     * consumer on rollback
1344     */
1345    private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1346        if (previouslyDeliveredMessages != null) {
1347            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1348                if (!entry.getValue()) {
1349                    LOG.trace("rollback non redelivered: {}" + entry.getKey());
1350                    removeFromDeliveredMessages(entry.getKey());
1351                }
1352            }
1353            clearPreviouslyDelivered();
1354        }
1355    }
1356
1357    /*
1358     * called with deliveredMessages locked
1359     */
1360    private void removeFromDeliveredMessages(MessageId key) {
1361        Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1362        while (iterator.hasNext()) {
1363            MessageDispatch candidate = iterator.next();
1364            if (key.equals(candidate.getMessage().getMessageId())) {
1365                session.connection.rollbackDuplicate(this, candidate.getMessage());
1366                iterator.remove();
1367                break;
1368            }
1369        }
1370    }
1371
1372    /*
1373     * called with deliveredMessages locked
1374     */
1375    private void clearPreviouslyDelivered() {
1376        if (previouslyDeliveredMessages != null) {
1377            previouslyDeliveredMessages.clear();
1378            previouslyDeliveredMessages = null;
1379        }
1380    }
1381
1382    @Override
1383    public void dispatch(MessageDispatch md) {
1384        MessageListener listener = this.messageListener.get();
1385        try {
1386            clearMessagesInProgress();
1387            clearDeliveredList();
1388            synchronized (unconsumedMessages.getMutex()) {
1389                if (!unconsumedMessages.isClosed()) {
1390                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1391                        if (listener != null && unconsumedMessages.isRunning()) {
1392                            if (redeliveryExceeded(md)) {
1393                                posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
1394                                return;
1395                            }
1396                            ActiveMQMessage message = createActiveMQMessage(md);
1397                            beforeMessageIsConsumed(md);
1398                            try {
1399                                boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired();
1400                                if (!expired) {
1401                                    listener.onMessage(message);
1402                                }
1403                                afterMessageIsConsumed(md, expired);
1404                            } catch (RuntimeException e) {
1405                                LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e);
1406                                md.setRollbackCause(e);
1407                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1408                                    // schedual redelivery and possible dlq processing
1409                                    rollback();
1410                                } else {
1411                                    // Transacted or Client ack: Deliver the next message.
1412                                    afterMessageIsConsumed(md, false);
1413                                }
1414                            }
1415                        } else {
1416                            if (!unconsumedMessages.isRunning()) {
1417                                // delayed redelivery, ensure it can be re delivered
1418                                session.connection.rollbackDuplicate(this, md.getMessage());
1419                            }
1420                            unconsumedMessages.enqueue(md);
1421                            if (availableListener != null) {
1422                                availableListener.onMessageAvailable(this);
1423                            }
1424                        }
1425                    } else {
1426                        // deal with duplicate delivery
1427                        ConsumerId consumerWithPendingTransaction;
1428                        if (redeliveryExpectedInCurrentTransaction(md, true)) {
1429                            LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
1430                            if (transactedIndividualAck) {
1431                                immediateIndividualTransactedAck(md);
1432                            } else {
1433                                session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
1434                            }
1435                        } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
1436                            LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
1437                            session.getConnection().rollbackDuplicate(this, md.getMessage());
1438                            dispatch(md);
1439                        } else {
1440                            LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
1441                            posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
1442                        }
1443                    }
1444                }
1445            }
1446            if (++dispatchedCount % 1000 == 0) {
1447                dispatchedCount = 0;
1448                Thread.yield();
1449            }
1450        } catch (Exception e) {
1451            session.connection.onClientInternalException(e);
1452        }
1453    }
1454
1455    private boolean redeliveryExpectedInCurrentTransaction(MessageDispatch md, boolean markReceipt) {
1456        if (session.isTransacted()) {
1457            synchronized (deliveredMessages) {
1458                if (previouslyDeliveredMessages != null) {
1459                    if (previouslyDeliveredMessages.containsKey(md.getMessage().getMessageId())) {
1460                        if (markReceipt) {
1461                            previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1462                        }
1463                        return true;
1464                    }
1465                }
1466            }
1467        }
1468        return false;
1469    }
1470
1471    private ConsumerId redeliveryPendingInCompetingTransaction(MessageDispatch md) {
1472        for (ActiveMQSession activeMQSession: session.connection.getSessions()) {
1473            for (ActiveMQMessageConsumer activeMQMessageConsumer : activeMQSession.consumers) {
1474                if (activeMQMessageConsumer.redeliveryExpectedInCurrentTransaction(md, false)) {
1475                    return activeMQMessageConsumer.getConsumerId();
1476                }
1477            }
1478        }
1479        return null;
1480    }
1481
1482    // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1483    private void clearDeliveredList() {
1484        if (clearDeliveredList) {
1485            synchronized (deliveredMessages) {
1486                if (clearDeliveredList) {
1487                    if (!deliveredMessages.isEmpty()) {
1488                        if (session.isTransacted()) {
1489
1490                            if (previouslyDeliveredMessages == null) {
1491                                previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1492                            }
1493                            for (MessageDispatch delivered : deliveredMessages) {
1494                                previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1495                            }
1496                            LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
1497                                      getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
1498                        } else {
1499                            if (session.isClientAcknowledge()) {
1500                                LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
1501                                // allow redelivery
1502                                if (!this.info.isBrowser()) {
1503                                    for (MessageDispatch md: deliveredMessages) {
1504                                        this.session.connection.rollbackDuplicate(this, md.getMessage());
1505                                    }
1506                                }
1507                            }
1508                            LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
1509                            deliveredMessages.clear();
1510                            pendingAck = null;
1511                        }
1512                    }
1513                    clearDeliveredList = false;
1514                }
1515            }
1516        }
1517    }
1518
1519    public int getMessageSize() {
1520        return unconsumedMessages.size();
1521    }
1522
1523    public void start() throws JMSException {
1524        if (unconsumedMessages.isClosed()) {
1525            return;
1526        }
1527        started.set(true);
1528        unconsumedMessages.start();
1529        session.executor.wakeup();
1530    }
1531
1532    public void stop() {
1533        started.set(false);
1534        unconsumedMessages.stop();
1535    }
1536
1537    @Override
1538    public String toString() {
1539        return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1540               + " }";
1541    }
1542
1543    /**
1544     * Delivers a message to the message listener.
1545     *
1546     * @return true if another execution is needed.
1547     *
1548     * @throws JMSException
1549     */
1550    public boolean iterate() {
1551        MessageListener listener = this.messageListener.get();
1552        if (listener != null) {
1553            MessageDispatch md = unconsumedMessages.dequeueNoWait();
1554            if (md != null) {
1555                dispatch(md);
1556                return true;
1557            }
1558        }
1559        return false;
1560    }
1561
1562    public boolean isInUse(ActiveMQTempDestination destination) {
1563        return info.getDestination().equals(destination);
1564    }
1565
1566    public long getLastDeliveredSequenceId() {
1567        return lastDeliveredSequenceId;
1568    }
1569
1570    public IOException getFailureError() {
1571        return failureError;
1572    }
1573
1574    public void setFailureError(IOException failureError) {
1575        this.failureError = failureError;
1576    }
1577
1578    /**
1579     * @return the optimizedAckScheduledAckInterval
1580     */
1581    public long getOptimizedAckScheduledAckInterval() {
1582        return optimizedAckScheduledAckInterval;
1583    }
1584
1585    /**
1586     * @param optimizedAckScheduledAckInterval the optimizedAckScheduledAckInterval to set
1587     */
1588    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) throws JMSException {
1589        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1590
1591        if (this.optimizedAckTask != null) {
1592            try {
1593                this.session.connection.getScheduler().cancel(optimizedAckTask);
1594            } catch (JMSException e) {
1595                LOG.debug("Caught exception while cancelling old optimized ack task", e);
1596                throw e;
1597            }
1598            this.optimizedAckTask = null;
1599        }
1600
1601        // Should we periodically send out all outstanding acks.
1602        if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) {
1603            this.optimizedAckTask = new Runnable() {
1604
1605                @Override
1606                public void run() {
1607                    try {
1608                        if (optimizeAcknowledge && !unconsumedMessages.isClosed()) {
1609                            LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
1610                            deliverAcks();
1611                        }
1612                    } catch (Exception e) {
1613                        LOG.debug("Optimized Ack Task caught exception during ack", e);
1614                    }
1615                }
1616            };
1617
1618            try {
1619                this.session.connection.getScheduler().executePeriodically(optimizedAckTask, optimizedAckScheduledAckInterval);
1620            } catch (JMSException e) {
1621                LOG.debug("Caught exception while scheduling new optimized ack task", e);
1622                throw e;
1623            }
1624        }
1625    }
1626
1627    public boolean hasMessageListener() {
1628        return messageListener.get() != null;
1629    }
1630
1631    public boolean isConsumerExpiryCheckEnabled() {
1632        return consumerExpiryCheckEnabled;
1633    }
1634
1635    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
1636        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
1637    }
1638}