001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.Executors;
029    import java.util.concurrent.atomic.AtomicBoolean;
030    import java.util.concurrent.atomic.AtomicInteger;
031    import java.util.concurrent.atomic.AtomicReference;
032    
033    import javax.jms.IllegalStateException;
034    import javax.jms.InvalidDestinationException;
035    import javax.jms.JMSException;
036    import javax.jms.Message;
037    import javax.jms.MessageConsumer;
038    import javax.jms.MessageListener;
039    import javax.jms.TransactionRolledBackException;
040    
041    import org.apache.activemq.blob.BlobDownloader;
042    import org.apache.activemq.command.ActiveMQBlobMessage;
043    import org.apache.activemq.command.ActiveMQDestination;
044    import org.apache.activemq.command.ActiveMQMessage;
045    import org.apache.activemq.command.ActiveMQTempDestination;
046    import org.apache.activemq.command.CommandTypes;
047    import org.apache.activemq.command.ConsumerId;
048    import org.apache.activemq.command.ConsumerInfo;
049    import org.apache.activemq.command.MessageAck;
050    import org.apache.activemq.command.MessageDispatch;
051    import org.apache.activemq.command.MessageId;
052    import org.apache.activemq.command.MessagePull;
053    import org.apache.activemq.command.RemoveInfo;
054    import org.apache.activemq.command.TransactionId;
055    import org.apache.activemq.management.JMSConsumerStatsImpl;
056    import org.apache.activemq.management.StatsCapable;
057    import org.apache.activemq.management.StatsImpl;
058    import org.apache.activemq.selector.SelectorParser;
059    import org.apache.activemq.transaction.Synchronization;
060    import org.apache.activemq.util.Callback;
061    import org.apache.activemq.util.IntrospectionSupport;
062    import org.apache.activemq.util.JMSExceptionSupport;
063    import org.apache.activemq.util.ThreadPoolUtils;
064    import org.slf4j.Logger;
065    import org.slf4j.LoggerFactory;
066    
067    /**
068     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
069     * from a destination. A <CODE> MessageConsumer</CODE> object is created by
070     * passing a <CODE>Destination</CODE> object to a message-consumer creation
071     * method supplied by a session.
072     * <P>
073     * <CODE>MessageConsumer</CODE> is the parent interface for all message
074     * consumers.
075     * <P>
076     * A message consumer can be created with a message selector. A message selector
077     * allows the client to restrict the messages delivered to the message consumer
078     * to those that match the selector.
079     * <P>
080     * A client may either synchronously receive a message consumer's messages or
081     * have the consumer asynchronously deliver them as they arrive.
082     * <P>
083     * For synchronous receipt, a client can request the next message from a message
084     * consumer using one of its <CODE> receive</CODE> methods. There are several
085     * variations of <CODE>receive</CODE> that allow a client to poll or wait for
086     * the next message.
087     * <P>
088     * For asynchronous delivery, a client can register a
089     * <CODE>MessageListener</CODE> object with a message consumer. As messages
090     * arrive at the message consumer, it delivers them by calling the
091     * <CODE>MessageListener</CODE>'s<CODE>
092     * onMessage</CODE> method.
093     * <P>
094     * It is a client programming error for a <CODE>MessageListener</CODE> to
095     * throw an exception.
096     *
097     *
098     * @see javax.jms.MessageConsumer
099     * @see javax.jms.QueueReceiver
100     * @see javax.jms.TopicSubscriber
101     * @see javax.jms.Session
102     */
103    public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
104    
105        @SuppressWarnings("serial")
106        class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
107            final TransactionId transactionId;
108            public PreviouslyDeliveredMap(TransactionId transactionId) {
109                this.transactionId = transactionId;
110            }
111        }
112    
113        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
114        protected final ActiveMQSession session;
115        protected final ConsumerInfo info;
116    
117        // These are the messages waiting to be delivered to the client
118        protected final MessageDispatchChannel unconsumedMessages;
119    
120        // The are the messages that were delivered to the consumer but that have
121        // not been acknowledged. It's kept in reverse order since we
122        // Always walk list in reverse order.
123        private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
124        // track duplicate deliveries in a transaction such that the tx integrity can be validated
125        private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
126        private int deliveredCounter;
127        private int additionalWindowSize;
128        private long redeliveryDelay;
129        private int ackCounter;
130        private int dispatchedCount;
131        private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
132        private final JMSConsumerStatsImpl stats;
133    
134        private final String selector;
135        private boolean synchronizationRegistered;
136        private final AtomicBoolean started = new AtomicBoolean(false);
137    
138        private MessageAvailableListener availableListener;
139    
140        private RedeliveryPolicy redeliveryPolicy;
141        private boolean optimizeAcknowledge;
142        private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
143        private ExecutorService executorService;
144        private MessageTransformer transformer;
145        private boolean clearDispatchList;
146        AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
147    
148        private MessageAck pendingAck;
149        private long lastDeliveredSequenceId;
150    
151        private IOException failureError;
152    
153        private long optimizeAckTimestamp = System.currentTimeMillis();
154        private long optimizeAcknowledgeTimeOut = 0;
155        private long optimizedAckScheduledAckInterval = 0;
156        private Runnable optimizedAckTask;
157        private long failoverRedeliveryWaitPeriod = 0;
158        private boolean transactedIndividualAck = false;
159        private boolean nonBlockingRedelivery = false;
160    
161        /**
162         * Create a MessageConsumer
163         *
164         * @param session
165         * @param dest
166         * @param name
167         * @param selector
168         * @param prefetch
169         * @param maximumPendingMessageCount
170         * @param noLocal
171         * @param browser
172         * @param dispatchAsync
173         * @param messageListener
174         * @throws JMSException
175         */
176        public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
177                String name, String selector, int prefetch,
178                int maximumPendingMessageCount, boolean noLocal, boolean browser,
179                boolean dispatchAsync, MessageListener messageListener) throws JMSException {
180            if (dest == null) {
181                throw new InvalidDestinationException("Don't understand null destinations");
182            } else if (dest.getPhysicalName() == null) {
183                throw new InvalidDestinationException("The destination object was not given a physical name.");
184            } else if (dest.isTemporary()) {
185                String physicalName = dest.getPhysicalName();
186    
187                if (physicalName == null) {
188                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
189                }
190    
191                String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
192    
193                if (physicalName.indexOf(connectionID) < 0) {
194                    throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
195                }
196    
197                if (session.connection.isDeleted(dest)) {
198                    throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
199                }
200                if (prefetch < 0) {
201                    throw new JMSException("Cannot have a prefetch size less than zero");
202                }
203            }
204            if (session.connection.isMessagePrioritySupported()) {
205                this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
206            }else {
207                this.unconsumedMessages = new FifoMessageDispatchChannel();
208            }
209    
210            this.session = session;
211            this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
212            setTransformer(session.getTransformer());
213    
214            this.info = new ConsumerInfo(consumerId);
215            this.info.setExclusive(this.session.connection.isExclusiveConsumer());
216            this.info.setSubscriptionName(name);
217            this.info.setPrefetchSize(prefetch);
218            this.info.setCurrentPrefetchSize(prefetch);
219            this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
220            this.info.setNoLocal(noLocal);
221            this.info.setDispatchAsync(dispatchAsync);
222            this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
223            this.info.setSelector(null);
224    
225            // Allows the options on the destination to configure the consumerInfo
226            if (dest.getOptions() != null) {
227                Map<String, Object> options = IntrospectionSupport.extractProperties(
228                    new HashMap<String, Object>(dest.getOptions()), "consumer.");
229                IntrospectionSupport.setProperties(this.info, options);
230                if (options.size() > 0) {
231                    String msg = "There are " + options.size()
232                        + " consumer options that couldn't be set on the consumer."
233                        + " Check the options are spelled correctly."
234                        + " Unknown parameters=[" + options + "]."
235                        + " This consumer cannot be started.";
236                    LOG.warn(msg);
237                    throw new ConfigurationException(msg);
238                }
239            }
240    
241            this.info.setDestination(dest);
242            this.info.setBrowser(browser);
243            if (selector != null && selector.trim().length() != 0) {
244                // Validate the selector
245                SelectorParser.parse(selector);
246                this.info.setSelector(selector);
247                this.selector = selector;
248            } else if (info.getSelector() != null) {
249                // Validate the selector
250                SelectorParser.parse(this.info.getSelector());
251                this.selector = this.info.getSelector();
252            } else {
253                this.selector = null;
254            }
255    
256            this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
257            this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
258                                       && !info.isBrowser();
259            if (this.optimizeAcknowledge) {
260                this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
261                setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
262            }
263    
264            this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
265            this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
266            this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
267            this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
268            if (messageListener != null) {
269                setMessageListener(messageListener);
270            }
271            try {
272                this.session.addConsumer(this);
273                this.session.syncSendPacket(info);
274            } catch (JMSException e) {
275                this.session.removeConsumer(this);
276                throw e;
277            }
278    
279            if (session.connection.isStarted()) {
280                start();
281            }
282        }
283    
284        private boolean isAutoAcknowledgeEach() {
285            return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
286        }
287    
288        private boolean isAutoAcknowledgeBatch() {
289            return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
290        }
291    
292        public StatsImpl getStats() {
293            return stats;
294        }
295    
296        public JMSConsumerStatsImpl getConsumerStats() {
297            return stats;
298        }
299    
300        public RedeliveryPolicy getRedeliveryPolicy() {
301            return redeliveryPolicy;
302        }
303    
304        /**
305         * Sets the redelivery policy used when messages are redelivered
306         */
307        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
308            this.redeliveryPolicy = redeliveryPolicy;
309        }
310    
311        public MessageTransformer getTransformer() {
312            return transformer;
313        }
314    
315        /**
316         * Sets the transformer used to transform messages before they are sent on
317         * to the JMS bus
318         */
319        public void setTransformer(MessageTransformer transformer) {
320            this.transformer = transformer;
321        }
322    
323        /**
324         * @return Returns the value.
325         */
326        public ConsumerId getConsumerId() {
327            return info.getConsumerId();
328        }
329    
330        /**
331         * @return the consumer name - used for durable consumers
332         */
333        public String getConsumerName() {
334            return this.info.getSubscriptionName();
335        }
336    
337        /**
338         * @return true if this consumer does not accept locally produced messages
339         */
340        protected boolean isNoLocal() {
341            return info.isNoLocal();
342        }
343    
344        /**
345         * Retrieve is a browser
346         *
347         * @return true if a browser
348         */
349        protected boolean isBrowser() {
350            return info.isBrowser();
351        }
352    
353        /**
354         * @return ActiveMQDestination
355         */
356        protected ActiveMQDestination getDestination() {
357            return info.getDestination();
358        }
359    
360        /**
361         * @return Returns the prefetchNumber.
362         */
363        public int getPrefetchNumber() {
364            return info.getPrefetchSize();
365        }
366    
367        /**
368         * @return true if this is a durable topic subscriber
369         */
370        public boolean isDurableSubscriber() {
371            return info.getSubscriptionName() != null && info.getDestination().isTopic();
372        }
373    
374        /**
375         * Gets this message consumer's message selector expression.
376         *
377         * @return this message consumer's message selector, or null if no message
378         *         selector exists for the message consumer (that is, if the message
379         *         selector was not set or was set to null or the empty string)
380         * @throws JMSException if the JMS provider fails to receive the next
381         *                 message due to some internal error.
382         */
383        public String getMessageSelector() throws JMSException {
384            checkClosed();
385            return selector;
386        }
387    
388        /**
389         * Gets the message consumer's <CODE>MessageListener</CODE>.
390         *
391         * @return the listener for the message consumer, or null if no listener is
392         *         set
393         * @throws JMSException if the JMS provider fails to get the message
394         *                 listener due to some internal error.
395         * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
396         */
397        public MessageListener getMessageListener() throws JMSException {
398            checkClosed();
399            return this.messageListener.get();
400        }
401    
402        /**
403         * Sets the message consumer's <CODE>MessageListener</CODE>.
404         * <P>
405         * Setting the message listener to null is the equivalent of unsetting the
406         * message listener for the message consumer.
407         * <P>
408         * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
409         * while messages are being consumed by an existing listener or the consumer
410         * is being used to consume messages synchronously is undefined.
411         *
412         * @param listener the listener to which the messages are to be delivered
413         * @throws JMSException if the JMS provider fails to receive the next
414         *                 message due to some internal error.
415         * @see javax.jms.MessageConsumer#getMessageListener
416         */
417        public void setMessageListener(MessageListener listener) throws JMSException {
418            checkClosed();
419            if (info.getPrefetchSize() == 0) {
420                throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
421            }
422            if (listener != null) {
423                boolean wasRunning = session.isRunning();
424                if (wasRunning) {
425                    session.stop();
426                }
427    
428                this.messageListener.set(listener);
429                session.redispatch(this, unconsumedMessages);
430    
431                if (wasRunning) {
432                    session.start();
433                }
434            } else {
435                this.messageListener.set(null);
436            }
437        }
438    
439        public MessageAvailableListener getAvailableListener() {
440            return availableListener;
441        }
442    
443        /**
444         * Sets the listener used to notify synchronous consumers that there is a
445         * message available so that the {@link MessageConsumer#receiveNoWait()} can
446         * be called.
447         */
448        public void setAvailableListener(MessageAvailableListener availableListener) {
449            this.availableListener = availableListener;
450        }
451    
452        /**
453         * Used to get an enqueued message from the unconsumedMessages list. The
454         * amount of time this method blocks is based on the timeout value. - if
455         * timeout==-1 then it blocks until a message is received. - if timeout==0
456         * then it it tries to not block at all, it returns a message if it is
457         * available - if timeout>0 then it blocks up to timeout amount of time.
458         * Expired messages will consumed by this method.
459         *
460         * @throws JMSException
461         * @return null if we timeout or if the consumer is closed.
462         */
463        private MessageDispatch dequeue(long timeout) throws JMSException {
464            try {
465                long deadline = 0;
466                if (timeout > 0) {
467                    deadline = System.currentTimeMillis() + timeout;
468                }
469                while (true) {
470                    MessageDispatch md = unconsumedMessages.dequeue(timeout);
471                    if (md == null) {
472                        if (timeout > 0 && !unconsumedMessages.isClosed()) {
473                            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
474                        } else {
475                            if (failureError != null) {
476                                throw JMSExceptionSupport.create(failureError);
477                            } else {
478                                return null;
479                            }
480                        }
481                    } else if (md.getMessage() == null) {
482                        return null;
483                    } else if (md.getMessage().isExpired()) {
484                        if (LOG.isDebugEnabled()) {
485                            LOG.debug(getConsumerId() + " received expired message: " + md);
486                        }
487                        beforeMessageIsConsumed(md);
488                        afterMessageIsConsumed(md, true);
489                        if (timeout > 0) {
490                            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
491                        }
492                    } else {
493                        if (LOG.isTraceEnabled()) {
494                            LOG.trace(getConsumerId() + " received message: " + md);
495                        }
496                        return md;
497                    }
498                }
499            } catch (InterruptedException e) {
500                Thread.currentThread().interrupt();
501                throw JMSExceptionSupport.create(e);
502            }
503        }
504    
505        /**
506         * Receives the next message produced for this message consumer.
507         * <P>
508         * This call blocks indefinitely until a message is produced or until this
509         * message consumer is closed.
510         * <P>
511         * If this <CODE>receive</CODE> is done within a transaction, the consumer
512         * retains the message until the transaction commits.
513         *
514         * @return the next message produced for this message consumer, or null if
515         *         this message consumer is concurrently closed
516         */
517        public Message receive() throws JMSException {
518            checkClosed();
519            checkMessageListener();
520    
521            sendPullCommand(0);
522            MessageDispatch md = dequeue(-1);
523            if (md == null) {
524                return null;
525            }
526    
527            beforeMessageIsConsumed(md);
528            afterMessageIsConsumed(md, false);
529    
530            return createActiveMQMessage(md);
531        }
532    
533        /**
534         * @param md
535         * @return
536         */
537        private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
538            ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
539            if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
540                ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
541            }
542            if (transformer != null) {
543                Message transformedMessage = transformer.consumerTransform(session, this, m);
544                if (transformedMessage != null) {
545                    m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
546                }
547            }
548            if (session.isClientAcknowledge()) {
549                m.setAcknowledgeCallback(new Callback() {
550                    public void execute() throws Exception {
551                        session.checkClosed();
552                        session.acknowledge();
553                    }
554                });
555            } else if (session.isIndividualAcknowledge()) {
556                m.setAcknowledgeCallback(new Callback() {
557                    public void execute() throws Exception {
558                        session.checkClosed();
559                        acknowledge(md);
560                    }
561                });
562            }
563            return m;
564        }
565    
566        /**
567         * Receives the next message that arrives within the specified timeout
568         * interval.
569         * <P>
570         * This call blocks until a message arrives, the timeout expires, or this
571         * message consumer is closed. A <CODE>timeout</CODE> of zero never
572         * expires, and the call blocks indefinitely.
573         *
574         * @param timeout the timeout value (in milliseconds), a time out of zero
575         *                never expires.
576         * @return the next message produced for this message consumer, or null if
577         *         the timeout expires or this message consumer is concurrently
578         *         closed
579         */
580        public Message receive(long timeout) throws JMSException {
581            checkClosed();
582            checkMessageListener();
583            if (timeout == 0) {
584                return this.receive();
585            }
586    
587            sendPullCommand(timeout);
588            while (timeout > 0) {
589    
590                MessageDispatch md;
591                if (info.getPrefetchSize() == 0) {
592                    md = dequeue(-1); // We let the broker let us know when we timeout.
593                } else {
594                    md = dequeue(timeout);
595                }
596    
597                if (md == null) {
598                    return null;
599                }
600    
601                beforeMessageIsConsumed(md);
602                afterMessageIsConsumed(md, false);
603                return createActiveMQMessage(md);
604            }
605            return null;
606        }
607    
608        /**
609         * Receives the next message if one is immediately available.
610         *
611         * @return the next message produced for this message consumer, or null if
612         *         one is not available
613         * @throws JMSException if the JMS provider fails to receive the next
614         *                 message due to some internal error.
615         */
616        public Message receiveNoWait() throws JMSException {
617            checkClosed();
618            checkMessageListener();
619            sendPullCommand(-1);
620    
621            MessageDispatch md;
622            if (info.getPrefetchSize() == 0) {
623                md = dequeue(-1); // We let the broker let us know when we
624                // timeout.
625            } else {
626                md = dequeue(0);
627            }
628    
629            if (md == null) {
630                return null;
631            }
632    
633            beforeMessageIsConsumed(md);
634            afterMessageIsConsumed(md, false);
635            return createActiveMQMessage(md);
636        }
637    
638        /**
639         * Closes the message consumer.
640         * <P>
641         * Since a provider may allocate some resources on behalf of a <CODE>
642         * MessageConsumer</CODE>
643         * outside the Java virtual machine, clients should close them when they are
644         * not needed. Relying on garbage collection to eventually reclaim these
645         * resources may not be timely enough.
646         * <P>
647         * This call blocks until a <CODE>receive</CODE> or message listener in
648         * progress has completed. A blocked message consumer <CODE>receive </CODE>
649         * call returns null when this message consumer is closed.
650         *
651         * @throws JMSException if the JMS provider fails to close the consumer due
652         *                 to some internal error.
653         */
654        public void close() throws JMSException {
655            if (!unconsumedMessages.isClosed()) {
656                if (session.getTransactionContext().isInTransaction()) {
657                    session.getTransactionContext().addSynchronization(new Synchronization() {
658                        @Override
659                        public void afterCommit() throws Exception {
660                            doClose();
661                        }
662    
663                        @Override
664                        public void afterRollback() throws Exception {
665                            doClose();
666                        }
667                    });
668                } else {
669                    doClose();
670                }
671            }
672        }
673    
674        void doClose() throws JMSException {
675            // Store interrupted state and clear so that Transport operations don't
676            // throw InterruptedException and we ensure that resources are clened up.
677            boolean interrupted = Thread.interrupted();
678            dispose();
679            RemoveInfo removeCommand = info.createRemoveCommand();
680            if (LOG.isDebugEnabled()) {
681                LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
682            }
683            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
684            this.session.asyncSendPacket(removeCommand);
685            if (interrupted) {
686                Thread.currentThread().interrupt();
687            }
688        }
689    
690        void inProgressClearRequired() {
691            inProgressClearRequiredFlag.incrementAndGet();
692            // deal with delivered messages async to avoid lock contention with in progress acks
693            clearDispatchList = true;
694        }
695    
696        void clearMessagesInProgress() {
697            if (inProgressClearRequiredFlag.get() > 0) {
698                synchronized (unconsumedMessages.getMutex()) {
699                    if (inProgressClearRequiredFlag.get() > 0) {
700                        if (LOG.isDebugEnabled()) {
701                            LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
702                        }
703                        // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
704                        List<MessageDispatch> list = unconsumedMessages.removeAll();
705                        if (!this.info.isBrowser()) {
706                            for (MessageDispatch old : list) {
707                                session.connection.rollbackDuplicate(this, old.getMessage());
708                            }
709                        }
710                        // allow dispatch on this connection to resume
711                        session.connection.transportInterruptionProcessingComplete();
712                        inProgressClearRequiredFlag.decrementAndGet();
713    
714                        // Wake up any blockers and allow them to recheck state.
715                        unconsumedMessages.getMutex().notifyAll();
716                    }
717                }
718            }
719        }
720    
721        void deliverAcks() {
722            MessageAck ack = null;
723            if (deliveryingAcknowledgements.compareAndSet(false, true)) {
724                if (isAutoAcknowledgeEach()) {
725                    synchronized(deliveredMessages) {
726                        ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
727                        if (ack != null) {
728                            deliveredMessages.clear();
729                            ackCounter = 0;
730                        } else {
731                            ack = pendingAck;
732                            pendingAck = null;
733                        }
734                    }
735                } else if (pendingAck != null && pendingAck.isStandardAck()) {
736                    ack = pendingAck;
737                    pendingAck = null;
738                }
739                if (ack != null) {
740                    final MessageAck ackToSend = ack;
741    
742                    if (executorService == null) {
743                        executorService = Executors.newSingleThreadExecutor();
744                    }
745                    executorService.submit(new Runnable() {
746                        public void run() {
747                            try {
748                                session.sendAck(ackToSend,true);
749                            } catch (JMSException e) {
750                                LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
751                            } finally {
752                                deliveryingAcknowledgements.set(false);
753                            }
754                        }
755                    });
756                } else {
757                    deliveryingAcknowledgements.set(false);
758                }
759            }
760        }
761    
762        public void dispose() throws JMSException {
763            if (!unconsumedMessages.isClosed()) {
764    
765                // Do we have any acks we need to send out before closing?
766                // Ack any delivered messages now.
767                if (!session.getTransacted()) {
768                    deliverAcks();
769                    if (isAutoAcknowledgeBatch()) {
770                        acknowledge();
771                    }
772                }
773                if (executorService != null) {
774                    ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
775                    executorService = null;
776                }
777                if (optimizedAckTask != null) {
778                    this.session.connection.getScheduler().cancel(optimizedAckTask);
779                    optimizedAckTask = null;
780                }
781    
782                if (session.isClientAcknowledge()) {
783                    if (!this.info.isBrowser()) {
784                        // rollback duplicates that aren't acknowledged
785                        List<MessageDispatch> tmp = null;
786                        synchronized (this.deliveredMessages) {
787                            tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
788                        }
789                        for (MessageDispatch old : tmp) {
790                            this.session.connection.rollbackDuplicate(this, old.getMessage());
791                        }
792                        tmp.clear();
793                    }
794                }
795                if (!session.isTransacted()) {
796                    synchronized(deliveredMessages) {
797                        deliveredMessages.clear();
798                    }
799                }
800                unconsumedMessages.close();
801                this.session.removeConsumer(this);
802                List<MessageDispatch> list = unconsumedMessages.removeAll();
803                if (!this.info.isBrowser()) {
804                    for (MessageDispatch old : list) {
805                        // ensure we don't filter this as a duplicate
806                        session.connection.rollbackDuplicate(this, old.getMessage());
807                    }
808                }
809            }
810        }
811    
812        /**
813         * @throws IllegalStateException
814         */
815        protected void checkClosed() throws IllegalStateException {
816            if (unconsumedMessages.isClosed()) {
817                throw new IllegalStateException("The Consumer is closed");
818            }
819        }
820    
821        /**
822         * If we have a zero prefetch specified then send a pull command to the
823         * broker to pull a message we are about to receive
824         */
825        protected void sendPullCommand(long timeout) throws JMSException {
826            clearDispatchList();
827            if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
828                MessagePull messagePull = new MessagePull();
829                messagePull.configure(info);
830                messagePull.setTimeout(timeout);
831                session.asyncSendPacket(messagePull);
832            }
833        }
834    
835        protected void checkMessageListener() throws JMSException {
836            session.checkMessageListener();
837        }
838    
839        protected void setOptimizeAcknowledge(boolean value) {
840            if (optimizeAcknowledge && !value) {
841                deliverAcks();
842            }
843            optimizeAcknowledge = value;
844        }
845    
846        protected void setPrefetchSize(int prefetch) {
847            deliverAcks();
848            this.info.setCurrentPrefetchSize(prefetch);
849        }
850    
851        private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
852            md.setDeliverySequenceId(session.getNextDeliveryId());
853            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
854            if (!isAutoAcknowledgeBatch()) {
855                synchronized(deliveredMessages) {
856                    deliveredMessages.addFirst(md);
857                }
858                if (session.getTransacted()) {
859                    if (transactedIndividualAck) {
860                        immediateIndividualTransactedAck(md);
861                    } else {
862                        ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
863                    }
864                }
865            }
866        }
867    
868        private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException {
869            // acks accumulate on the broker pending transaction completion to indicate
870            // delivery status
871            registerSync();
872            MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
873            ack.setTransactionId(session.getTransactionContext().getTransactionId());
874            session.syncSendPacket(ack);
875        }
876    
877        private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
878            if (unconsumedMessages.isClosed()) {
879                return;
880            }
881            if (messageExpired) {
882                acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
883                stats.getExpiredMessageCount().increment();
884            } else {
885                stats.onMessage();
886                if (session.getTransacted()) {
887                    // Do nothing.
888                } else if (isAutoAcknowledgeEach()) {
889                    if (deliveryingAcknowledgements.compareAndSet(false, true)) {
890                        synchronized (deliveredMessages) {
891                            if (!deliveredMessages.isEmpty()) {
892                                if (optimizeAcknowledge) {
893                                    ackCounter++;
894    
895                                    // AMQ-3956 evaluate both expired and normal msgs as
896                                    // otherwise consumer may get stalled
897                                    if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
898                                        MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
899                                        if (ack != null) {
900                                            deliveredMessages.clear();
901                                            ackCounter = 0;
902                                            session.sendAck(ack);
903                                            optimizeAckTimestamp = System.currentTimeMillis();
904                                        }
905                                        // AMQ-3956 - as further optimization send
906                                        // ack for expired msgs when there are any.
907                                        // This resets the deliveredCounter to 0 so that
908                                        // we won't sent standard acks with every msg just
909                                        // because the deliveredCounter just below
910                                        // 0.5 * prefetch as used in ackLater()
911                                        if (pendingAck != null && deliveredCounter > 0) {
912                                            session.sendAck(pendingAck);
913                                            pendingAck = null;
914                                            deliveredCounter = 0;
915                                        }
916                                    }
917                                } else {
918                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
919                                    if (ack!=null) {
920                                        deliveredMessages.clear();
921                                        session.sendAck(ack);
922                                    }
923                                }
924                            }
925                        }
926                        deliveryingAcknowledgements.set(false);
927                    }
928                } else if (isAutoAcknowledgeBatch()) {
929                    ackLater(md, MessageAck.STANDARD_ACK_TYPE);
930                } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
931                    boolean messageUnackedByConsumer = false;
932                    synchronized (deliveredMessages) {
933                        messageUnackedByConsumer = deliveredMessages.contains(md);
934                    }
935                    if (messageUnackedByConsumer) {
936                        ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
937                    }
938                }
939                else {
940                    throw new IllegalStateException("Invalid session state.");
941                }
942            }
943        }
944    
945        /**
946         * Creates a MessageAck for all messages contained in deliveredMessages.
947         * Caller should hold the lock for deliveredMessages.
948         *
949         * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
950         * @return <code>null</code> if nothing to ack.
951         */
952        private MessageAck makeAckForAllDeliveredMessages(byte type) {
953            synchronized (deliveredMessages) {
954                if (deliveredMessages.isEmpty())
955                    return null;
956    
957                MessageDispatch md = deliveredMessages.getFirst();
958                MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
959                ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
960                return ack;
961            }
962        }
963    
964        private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
965    
966            // Don't acknowledge now, but we may need to let the broker know the
967            // consumer got the message to expand the pre-fetch window
968            if (session.getTransacted()) {
969                registerSync();
970            }
971    
972            deliveredCounter++;
973    
974            MessageAck oldPendingAck = pendingAck;
975            pendingAck = new MessageAck(md, ackType, deliveredCounter);
976            pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
977            if( oldPendingAck==null ) {
978                pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
979            } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
980                pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
981            } else {
982                // old pending ack being superseded by ack of another type, if is is not a delivered
983                // ack and hence important, send it now so it is not lost.
984                if ( !oldPendingAck.isDeliveredAck()) {
985                    if (LOG.isDebugEnabled()) {
986                        LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
987                    }
988                    session.sendAck(oldPendingAck);
989                } else {
990                    if (LOG.isDebugEnabled()) {
991                        LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
992                    }
993                }
994            }
995            // AMQ-3956 evaluate both expired and normal msgs as
996            // otherwise consumer may get stalled
997            if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
998                session.sendAck(pendingAck);
999                pendingAck=null;
1000                deliveredCounter = 0;
1001                additionalWindowSize = 0;
1002            }
1003        }
1004    
1005        private void registerSync() throws JMSException {
1006            session.doStartTransaction();
1007            if (!synchronizationRegistered) {
1008                synchronizationRegistered = true;
1009                session.getTransactionContext().addSynchronization(new Synchronization() {
1010                    @Override
1011                    public void beforeEnd() throws Exception {
1012                        if (transactedIndividualAck) {
1013                            clearDispatchList();
1014                            waitForRedeliveries();
1015                            synchronized(deliveredMessages) {
1016                                rollbackOnFailedRecoveryRedelivery();
1017                            }
1018                        } else {
1019                            acknowledge();
1020                        }
1021                        synchronizationRegistered = false;
1022                    }
1023    
1024                    @Override
1025                    public void afterCommit() throws Exception {
1026                        commit();
1027                        synchronizationRegistered = false;
1028                    }
1029    
1030                    @Override
1031                    public void afterRollback() throws Exception {
1032                        rollback();
1033                        synchronizationRegistered = false;
1034                    }
1035                });
1036            }
1037        }
1038    
1039        /**
1040         * Acknowledge all the messages that have been delivered to the client up to
1041         * this point.
1042         *
1043         * @throws JMSException
1044         */
1045        public void acknowledge() throws JMSException {
1046            clearDispatchList();
1047            waitForRedeliveries();
1048            synchronized(deliveredMessages) {
1049                // Acknowledge all messages so far.
1050                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
1051                if (ack == null)
1052                    return; // no msgs
1053    
1054                if (session.getTransacted()) {
1055                    rollbackOnFailedRecoveryRedelivery();
1056                    session.doStartTransaction();
1057                    ack.setTransactionId(session.getTransactionContext().getTransactionId());
1058                }
1059    
1060                pendingAck = null;
1061                session.sendAck(ack);
1062    
1063                // Adjust the counters
1064                deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1065                additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1066    
1067                if (!session.getTransacted()) {
1068                    deliveredMessages.clear();
1069                }
1070            }
1071        }
1072    
1073        private void waitForRedeliveries() {
1074            if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1075                long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1076                int numberNotReplayed;
1077                do {
1078                    numberNotReplayed = 0;
1079                    synchronized(deliveredMessages) {
1080                        if (previouslyDeliveredMessages != null) {
1081                            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1082                                if (!entry.getValue()) {
1083                                    numberNotReplayed++;
1084                                }
1085                            }
1086                        }
1087                    }
1088                    if (numberNotReplayed > 0) {
1089                        LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
1090                                + previouslyDeliveredMessages.transactionId +  ", to consumer :" + this.getConsumerId());
1091                        try {
1092                            Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1093                        } catch (InterruptedException outOfhere) {
1094                            break;
1095                        }
1096                    }
1097                } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1098            }
1099        }
1100    
1101        /*
1102         * called with deliveredMessages locked
1103         */
1104        private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1105            if (previouslyDeliveredMessages != null) {
1106                // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1107                // as messages have been dispatched else where.
1108                int numberNotReplayed = 0;
1109                for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1110                    if (!entry.getValue()) {
1111                        numberNotReplayed++;
1112                        if (LOG.isDebugEnabled()) {
1113                            LOG.debug("previously delivered message has not been replayed in transaction: "
1114                                    + previouslyDeliveredMessages.transactionId
1115                                    + " , messageId: " + entry.getKey());
1116                        }
1117                    }
1118                }
1119                if (numberNotReplayed > 0) {
1120                    String message = "rolling back transaction ("
1121                        + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1122                        + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1123                    LOG.warn(message);
1124                    throw new TransactionRolledBackException(message);
1125                }
1126            }
1127        }
1128    
1129        void acknowledge(MessageDispatch md) throws JMSException {
1130            acknowledge(md, MessageAck.INDIVIDUAL_ACK_TYPE);
1131        }
1132    
1133        void acknowledge(MessageDispatch md, byte ackType) throws JMSException {
1134            MessageAck ack = new MessageAck(md, ackType, 1);
1135            session.sendAck(ack);
1136            synchronized(deliveredMessages){
1137                deliveredMessages.remove(md);
1138            }
1139        }
1140    
1141        public void commit() throws JMSException {
1142            synchronized (deliveredMessages) {
1143                deliveredMessages.clear();
1144                clearPreviouslyDelivered();
1145            }
1146            redeliveryDelay = 0;
1147        }
1148    
1149        public void rollback() throws JMSException {
1150            synchronized (unconsumedMessages.getMutex()) {
1151                if (optimizeAcknowledge) {
1152                    // remove messages read but not acked at the broker yet through
1153                    // optimizeAcknowledge
1154                    if (!this.info.isBrowser()) {
1155                        synchronized(deliveredMessages) {
1156                            for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1157                                // ensure we don't filter this as a duplicate
1158                                MessageDispatch md = deliveredMessages.removeLast();
1159                                session.connection.rollbackDuplicate(this, md.getMessage());
1160                            }
1161                        }
1162                    }
1163                }
1164                synchronized(deliveredMessages) {
1165                    rollbackPreviouslyDeliveredAndNotRedelivered();
1166                    if (deliveredMessages.isEmpty()) {
1167                        return;
1168                    }
1169    
1170                    // use initial delay for first redelivery
1171                    MessageDispatch lastMd = deliveredMessages.getFirst();
1172                    final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1173                    if (currentRedeliveryCount > 0) {
1174                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1175                    } else {
1176                        redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1177                    }
1178                    MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1179    
1180                    for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1181                        MessageDispatch md = iter.next();
1182                        md.getMessage().onMessageRolledBack();
1183                        // ensure we don't filter this as a duplicate
1184                        session.connection.rollbackDuplicate(this, md.getMessage());
1185                    }
1186    
1187                    if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1188                        && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1189                        // We need to NACK the messages so that they get sent to the
1190                        // DLQ.
1191                        // Acknowledge the last message.
1192    
1193                        MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1194                        ack.setPoisonCause(lastMd.getRollbackCause());
1195                        ack.setFirstMessageId(firstMsgId);
1196                        session.sendAck(ack,true);
1197                        // Adjust the window size.
1198                        additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1199                        redeliveryDelay = 0;
1200                    } else {
1201    
1202                        // only redelivery_ack after first delivery
1203                        if (currentRedeliveryCount > 0) {
1204                            MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1205                            ack.setFirstMessageId(firstMsgId);
1206                            session.sendAck(ack,true);
1207                        }
1208    
1209                        // stop the delivery of messages.
1210                        if (nonBlockingRedelivery) {
1211                            if (!unconsumedMessages.isClosed()) {
1212    
1213                                final LinkedList<MessageDispatch> pendingRedeliveries =
1214                                    new LinkedList<MessageDispatch>(deliveredMessages);
1215    
1216                                // Start up the delivery again a little later.
1217                                session.getScheduler().executeAfterDelay(new Runnable() {
1218                                    public void run() {
1219                                        try {
1220                                            if (!unconsumedMessages.isClosed()) {
1221                                                for(MessageDispatch dispatch : pendingRedeliveries) {
1222                                                    session.dispatch(dispatch);
1223                                                }
1224                                            }
1225                                        } catch (Exception e) {
1226                                            session.connection.onAsyncException(e);
1227                                        }
1228                                    }
1229                                }, redeliveryDelay);
1230                            }
1231    
1232                        } else {
1233                            unconsumedMessages.stop();
1234    
1235                            for (MessageDispatch md : deliveredMessages) {
1236                                unconsumedMessages.enqueueFirst(md);
1237                            }
1238    
1239                            if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1240                                // Start up the delivery again a little later.
1241                                session.getScheduler().executeAfterDelay(new Runnable() {
1242                                    public void run() {
1243                                        try {
1244                                            if (started.get()) {
1245                                                start();
1246                                            }
1247                                        } catch (JMSException e) {
1248                                            session.connection.onAsyncException(e);
1249                                        }
1250                                    }
1251                                }, redeliveryDelay);
1252                            } else {
1253                                start();
1254                            }
1255                        }
1256                    }
1257                    deliveredCounter -= deliveredMessages.size();
1258                    deliveredMessages.clear();
1259                }
1260            }
1261            if (messageListener.get() != null) {
1262                session.redispatch(this, unconsumedMessages);
1263            }
1264        }
1265    
1266        /*
1267         * called with unconsumedMessages && deliveredMessages locked
1268         * remove any message not re-delivered as they can't be replayed to this
1269         * consumer on rollback
1270         */
1271        private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1272            if (previouslyDeliveredMessages != null) {
1273                for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1274                    if (!entry.getValue()) {
1275                        removeFromDeliveredMessages(entry.getKey());
1276                    }
1277                }
1278                clearPreviouslyDelivered();
1279            }
1280        }
1281    
1282        /*
1283         * called with deliveredMessages locked
1284         */
1285        private void removeFromDeliveredMessages(MessageId key) {
1286            Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1287            while (iterator.hasNext()) {
1288                MessageDispatch candidate = iterator.next();
1289                if (key.equals(candidate.getMessage().getMessageId())) {
1290                    session.connection.rollbackDuplicate(this, candidate.getMessage());
1291                    iterator.remove();
1292                    break;
1293                }
1294            }
1295        }
1296    
1297        /*
1298         * called with deliveredMessages locked
1299         */
1300        private void clearPreviouslyDelivered() {
1301            if (previouslyDeliveredMessages != null) {
1302                previouslyDeliveredMessages.clear();
1303                previouslyDeliveredMessages = null;
1304            }
1305        }
1306    
1307        public void dispatch(MessageDispatch md) {
1308            MessageListener listener = this.messageListener.get();
1309            try {
1310                clearMessagesInProgress();
1311                clearDispatchList();
1312                synchronized (unconsumedMessages.getMutex()) {
1313                    if (!unconsumedMessages.isClosed()) {
1314                        if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1315                            if (listener != null && unconsumedMessages.isRunning()) {
1316                                ActiveMQMessage message = createActiveMQMessage(md);
1317                                beforeMessageIsConsumed(md);
1318                                try {
1319                                    boolean expired = message.isExpired();
1320                                    if (!expired) {
1321                                        listener.onMessage(message);
1322                                    }
1323                                    afterMessageIsConsumed(md, expired);
1324                                } catch (RuntimeException e) {
1325                                    LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
1326                                    if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1327                                        // schedual redelivery and possible dlq processing
1328                                        md.setRollbackCause(e);
1329                                        rollback();
1330                                    } else {
1331                                        // Transacted or Client ack: Deliver the
1332                                        // next message.
1333                                        afterMessageIsConsumed(md, false);
1334                                    }
1335                                }
1336                            } else {
1337                                if (!unconsumedMessages.isRunning()) {
1338                                    // delayed redelivery, ensure it can be re delivered
1339                                    session.connection.rollbackDuplicate(this, md.getMessage());
1340                                }
1341                                unconsumedMessages.enqueue(md);
1342                                if (availableListener != null) {
1343                                    availableListener.onMessageAvailable(this);
1344                                }
1345                            }
1346                        } else {
1347                            if (!session.isTransacted()) {
1348                                LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId()
1349                                        + " to consumer: "  + getConsumerId() + ", ignoring (auto acking) duplicate: " + md);
1350                                MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
1351                                session.sendAck(ack);
1352                            } else {
1353                                if (LOG.isDebugEnabled()) {
1354                                    LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
1355                                }
1356                                boolean needsPoisonAck = false;
1357                                synchronized (deliveredMessages) {
1358                                    if (previouslyDeliveredMessages != null) {
1359                                        previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1360                                    } else {
1361                                        // delivery while pending redelivery to another consumer on the same connection
1362                                        // not waiting for redelivery will help here
1363                                        needsPoisonAck = true;
1364                                    }
1365                                }
1366                                if (needsPoisonAck) {
1367                                    MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
1368                                    poisonAck.setFirstMessageId(md.getMessage().getMessageId());
1369                                    poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
1370                                            + session.getConnection().getConnectionInfo().getConnectionId()));
1371                                    LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
1372                                            + " consumer on this connection, failoverRedeliveryWaitPeriod="
1373                                            + failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck);
1374                                    session.sendAck(poisonAck);
1375                                } else {
1376                                    if (transactedIndividualAck) {
1377                                        immediateIndividualTransactedAck(md);
1378                                    } else {
1379                                        ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
1380                                    }
1381                                }
1382                            }
1383                        }
1384                    }
1385                }
1386                if (++dispatchedCount % 1000 == 0) {
1387                    dispatchedCount = 0;
1388                    Thread.yield();
1389                }
1390            } catch (Exception e) {
1391                session.connection.onClientInternalException(e);
1392            }
1393        }
1394    
1395        // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1396        private void clearDispatchList() {
1397            if (clearDispatchList) {
1398                synchronized (deliveredMessages) {
1399                    if (clearDispatchList) {
1400                        if (!deliveredMessages.isEmpty()) {
1401                            if (session.isTransacted()) {
1402                                if (LOG.isDebugEnabled()) {
1403                                    LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1404                                }
1405                                if (previouslyDeliveredMessages == null) {
1406                                    previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1407                                }
1408                                for (MessageDispatch delivered : deliveredMessages) {
1409                                    previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1410                                }
1411                            } else {
1412                                if (LOG.isDebugEnabled()) {
1413                                    LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1414                                }
1415                                deliveredMessages.clear();
1416                                pendingAck = null;
1417                            }
1418                        }
1419                        clearDispatchList = false;
1420                    }
1421                }
1422            }
1423        }
1424    
1425        public int getMessageSize() {
1426            return unconsumedMessages.size();
1427        }
1428    
1429        public void start() throws JMSException {
1430            if (unconsumedMessages.isClosed()) {
1431                return;
1432            }
1433            started.set(true);
1434            unconsumedMessages.start();
1435            session.executor.wakeup();
1436        }
1437    
1438        public void stop() {
1439            started.set(false);
1440            unconsumedMessages.stop();
1441        }
1442    
1443        @Override
1444        public String toString() {
1445            return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1446                   + " }";
1447        }
1448    
1449        /**
1450         * Delivers a message to the message listener.
1451         *
1452         * @return
1453         * @throws JMSException
1454         */
1455        public boolean iterate() {
1456            MessageListener listener = this.messageListener.get();
1457            if (listener != null) {
1458                MessageDispatch md = unconsumedMessages.dequeueNoWait();
1459                if (md != null) {
1460                    dispatch(md);
1461                    return true;
1462                }
1463            }
1464            return false;
1465        }
1466    
1467        public boolean isInUse(ActiveMQTempDestination destination) {
1468            return info.getDestination().equals(destination);
1469        }
1470    
1471        public long getLastDeliveredSequenceId() {
1472            return lastDeliveredSequenceId;
1473        }
1474    
1475        public IOException getFailureError() {
1476            return failureError;
1477        }
1478    
1479        public void setFailureError(IOException failureError) {
1480            this.failureError = failureError;
1481        }
1482    
1483        /**
1484         * @return the optimizedAckScheduledAckInterval
1485         */
1486        public long getOptimizedAckScheduledAckInterval() {
1487            return optimizedAckScheduledAckInterval;
1488        }
1489    
1490        /**
1491         * @param optimizedAckScheduledAckInterval the optimizedAckScheduledAckInterval to set
1492         */
1493        public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) throws JMSException {
1494            this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1495    
1496            if (this.optimizedAckTask != null) {
1497                try {
1498                    this.session.connection.getScheduler().cancel(optimizedAckTask);
1499                } catch (JMSException e) {
1500                    LOG.debug("Caught exception while cancelling old optimized ack task", e);
1501                    throw e;
1502                }
1503                this.optimizedAckTask = null;
1504            }
1505    
1506            // Should we periodically send out all outstanding acks.
1507            if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) {
1508                this.optimizedAckTask = new Runnable() {
1509    
1510                    @Override
1511                    public void run() {
1512                        try {
1513                            if (optimizeAcknowledge && !unconsumedMessages.isClosed()) {
1514                                if (LOG.isInfoEnabled()) {
1515                                    LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
1516                                }
1517                                deliverAcks();
1518                            }
1519                        } catch (Exception e) {
1520                            LOG.debug("Optimized Ack Task caught exception during ack", e);
1521                        }
1522                    }
1523                };
1524    
1525                try {
1526                    this.session.connection.getScheduler().executePeriodically(optimizedAckTask, optimizedAckScheduledAckInterval);
1527                } catch (JMSException e) {
1528                    LOG.debug("Caught exception while scheduling new optimized ack task", e);
1529                    throw e;
1530                }
1531            }
1532        }
1533    }