001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.HashMap;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.concurrent.ExecutorService;
025    import java.util.concurrent.Executors;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.atomic.AtomicBoolean;
028    import java.util.concurrent.atomic.AtomicReference;
029    
030    import javax.jms.IllegalStateException;
031    import javax.jms.InvalidDestinationException;
032    import javax.jms.JMSException;
033    import javax.jms.Message;
034    import javax.jms.MessageListener;
035    
036    import org.apache.activemq.command.ActiveMQDestination;
037    import org.apache.activemq.command.ActiveMQMessage;
038    import org.apache.activemq.command.ActiveMQTempDestination;
039    import org.apache.activemq.command.ConsumerId;
040    import org.apache.activemq.command.ConsumerInfo;
041    import org.apache.activemq.command.MessageAck;
042    import org.apache.activemq.command.MessageDispatch;
043    import org.apache.activemq.command.MessageId;
044    import org.apache.activemq.command.MessagePull;
045    import org.apache.activemq.management.JMSConsumerStatsImpl;
046    import org.apache.activemq.management.StatsCapable;
047    import org.apache.activemq.management.StatsImpl;
048    import org.apache.activemq.selector.SelectorParser;
049    import org.apache.activemq.thread.Scheduler;
050    import org.apache.activemq.transaction.Synchronization;
051    import org.apache.activemq.util.Callback;
052    import org.apache.activemq.util.IntrospectionSupport;
053    import org.apache.activemq.util.JMSExceptionSupport;
054    import org.apache.commons.logging.Log;
055    import org.apache.commons.logging.LogFactory;
056    
057    /**
058     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
059     * from a destination. A <CODE> MessageConsumer</CODE> object is created by
060     * passing a <CODE>Destination</CODE> object to a message-consumer creation
061     * method supplied by a session.
062     * <P>
063     * <CODE>MessageConsumer</CODE> is the parent interface for all message
064     * consumers.
065     * <P>
066     * A message consumer can be created with a message selector. A message selector
067     * allows the client to restrict the messages delivered to the message consumer
068     * to those that match the selector.
069     * <P>
070     * A client may either synchronously receive a message consumer's messages or
071     * have the consumer asynchronously deliver them as they arrive.
072     * <P>
073     * For synchronous receipt, a client can request the next message from a message
074     * consumer using one of its <CODE> receive</CODE> methods. There are several
075     * variations of <CODE>receive</CODE> that allow a client to poll or wait for
076     * the next message.
077     * <P>
078     * For asynchronous delivery, a client can register a
079     * <CODE>MessageListener</CODE> object with a message consumer. As messages
080     * arrive at the message consumer, it delivers them by calling the
081     * <CODE>MessageListener</CODE>'s<CODE>
082     * onMessage</CODE> method.
083     * <P>
084     * It is a client programming error for a <CODE>MessageListener</CODE> to
085     * throw an exception.
086     * 
087     * @version $Revision: 1.22 $
088     * @see javax.jms.MessageConsumer
089     * @see javax.jms.QueueReceiver
090     * @see javax.jms.TopicSubscriber
091     * @see javax.jms.Session
092     */
093    public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
094    
095        private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
096    
097        protected final ActiveMQSession session;
098        protected final ConsumerInfo info;
099    
100        // These are the messages waiting to be delivered to the client
101        private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
102    
103        // The are the messages that were delivered to the consumer but that have
104        // not been acknowledged. It's kept in reverse order since we
105        // Always walk list in reverse order. Only used when session is client ack.
106        private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
107        private int deliveredCounter;
108        private int additionalWindowSize;
109        private long redeliveryDelay;
110        private int ackCounter;
111        private int dispatchedCount;
112        private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
113        private JMSConsumerStatsImpl stats;
114    
115        private final String selector;
116        private boolean synchronizationRegistered;
117        private AtomicBoolean started = new AtomicBoolean(false);
118    
119        private MessageAvailableListener availableListener;
120    
121        private RedeliveryPolicy redeliveryPolicy;
122        private boolean optimizeAcknowledge;
123        private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
124        private ExecutorService executorService;
125        private MessageTransformer transformer;
126        private boolean clearDispatchList;
127    
128        private MessageAck pendingAck;
129    
130        /**
131         * Create a MessageConsumer
132         * 
133         * @param session
134         * @param dest
135         * @param name
136         * @param selector
137         * @param prefetch
138         * @param maximumPendingMessageCount TODO
139         * @param noLocal
140         * @param browser
141         * @param dispatchAsync
142         * @param messageListener
143         * @throws JMSException
144         */
145        public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
146                String name, String selector, int prefetch,
147                int maximumPendingMessageCount, boolean noLocal, boolean browser,
148                boolean dispatchAsync, MessageListener messageListener) throws JMSException {
149            if (dest == null) {
150                throw new InvalidDestinationException("Don't understand null destinations");
151            } else if (dest.getPhysicalName() == null) {
152                throw new InvalidDestinationException("The destination object was not given a physical name.");
153            } else if (dest.isTemporary()) {
154                String physicalName = dest.getPhysicalName();
155    
156                if (physicalName == null) {
157                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
158                }
159    
160                String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
161    
162                if (physicalName.indexOf(connectionID) < 0) {
163                    throw new InvalidDestinationException(
164                                                          "Cannot use a Temporary destination from another Connection");
165                }
166    
167                if (session.connection.isDeleted(dest)) {
168                    throw new InvalidDestinationException(
169                                                          "Cannot use a Temporary destination that has been deleted");
170                }
171                if (prefetch < 0) {
172                    throw new JMSException("Cannot have a prefetch size less than zero");
173                }
174            }
175    
176            this.session = session;
177            this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
178            setTransformer(session.getTransformer());
179    
180            this.info = new ConsumerInfo(consumerId);
181            this.info.setExclusive(this.session.connection.isExclusiveConsumer());
182            this.info.setSubscriptionName(name);
183            this.info.setPrefetchSize(prefetch);
184            this.info.setCurrentPrefetchSize(prefetch);
185            this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
186            this.info.setNoLocal(noLocal);
187            this.info.setDispatchAsync(dispatchAsync);
188            this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
189            this.info.setSelector(null);
190    
191            // Allows the options on the destination to configure the consumerInfo
192            if (dest.getOptions() != null) {
193                Map<String, String> options = new HashMap<String, String>(dest.getOptions());
194                IntrospectionSupport.setProperties(this.info, options, "consumer.");
195            }
196    
197            this.info.setDestination(dest);
198            this.info.setBrowser(browser);
199            if (selector != null && selector.trim().length() != 0) {
200                // Validate the selector
201                new SelectorParser().parse(selector);
202                this.info.setSelector(selector);
203                this.selector = selector;
204            } else if (info.getSelector() != null) {
205                // Validate the selector
206                new SelectorParser().parse(this.info.getSelector());
207                this.selector = this.info.getSelector();
208            } else {
209                this.selector = null;
210            }
211    
212            this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
213            this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
214                                       && !info.isBrowser();
215            this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
216    
217            if (messageListener != null) {
218                setMessageListener(messageListener);
219            }
220            try {
221                this.session.addConsumer(this);
222                this.session.syncSendPacket(info);
223            } catch (JMSException e) {
224                this.session.removeConsumer(this);
225                throw e;
226            }
227    
228            if (session.connection.isStarted()) {
229                start();
230            }
231        }
232    
233        public StatsImpl getStats() {
234            return stats;
235        }
236    
237        public JMSConsumerStatsImpl getConsumerStats() {
238            return stats;
239        }
240    
241        public RedeliveryPolicy getRedeliveryPolicy() {
242            return redeliveryPolicy;
243        }
244    
245        /**
246         * Sets the redelivery policy used when messages are redelivered
247         */
248        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
249            this.redeliveryPolicy = redeliveryPolicy;
250        }
251    
252        public MessageTransformer getTransformer() {
253            return transformer;
254        }
255    
256        /**
257         * Sets the transformer used to transform messages before they are sent on
258         * to the JMS bus
259         */
260        public void setTransformer(MessageTransformer transformer) {
261            this.transformer = transformer;
262        }
263    
264        /**
265         * @return Returns the value.
266         */
267        public ConsumerId getConsumerId() {
268            return info.getConsumerId();
269        }
270    
271        /**
272         * @return the consumer name - used for durable consumers
273         */
274        public String getConsumerName() {
275            return this.info.getSubscriptionName();
276        }
277    
278        /**
279         * @return true if this consumer does not accept locally produced messages
280         */
281        protected boolean isNoLocal() {
282            return info.isNoLocal();
283        }
284    
285        /**
286         * Retrieve is a browser
287         * 
288         * @return true if a browser
289         */
290        protected boolean isBrowser() {
291            return info.isBrowser();
292        }
293    
294        /**
295         * @return ActiveMQDestination
296         */
297        protected ActiveMQDestination getDestination() {
298            return info.getDestination();
299        }
300    
301        /**
302         * @return Returns the prefetchNumber.
303         */
304        public int getPrefetchNumber() {
305            return info.getPrefetchSize();
306        }
307    
308        /**
309         * @return true if this is a durable topic subscriber
310         */
311        public boolean isDurableSubscriber() {
312            return info.getSubscriptionName() != null && info.getDestination().isTopic();
313        }
314    
315        /**
316         * Gets this message consumer's message selector expression.
317         * 
318         * @return this message consumer's message selector, or null if no message
319         *         selector exists for the message consumer (that is, if the message
320         *         selector was not set or was set to null or the empty string)
321         * @throws JMSException if the JMS provider fails to receive the next
322         *                 message due to some internal error.
323         */
324        public String getMessageSelector() throws JMSException {
325            checkClosed();
326            return selector;
327        }
328    
329        /**
330         * Gets the message consumer's <CODE>MessageListener</CODE>.
331         * 
332         * @return the listener for the message consumer, or null if no listener is
333         *         set
334         * @throws JMSException if the JMS provider fails to get the message
335         *                 listener due to some internal error.
336         * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
337         */
338        public MessageListener getMessageListener() throws JMSException {
339            checkClosed();
340            return this.messageListener.get();
341        }
342    
343        /**
344         * Sets the message consumer's <CODE>MessageListener</CODE>.
345         * <P>
346         * Setting the message listener to null is the equivalent of unsetting the
347         * message listener for the message consumer.
348         * <P>
349         * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
350         * while messages are being consumed by an existing listener or the consumer
351         * is being used to consume messages synchronously is undefined.
352         * 
353         * @param listener the listener to which the messages are to be delivered
354         * @throws JMSException if the JMS provider fails to receive the next
355         *                 message due to some internal error.
356         * @see javax.jms.MessageConsumer#getMessageListener
357         */
358        public void setMessageListener(MessageListener listener) throws JMSException {
359            checkClosed();
360            if (info.getPrefetchSize() == 0) {
361                throw new JMSException(
362                                       "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
363            }
364            if (listener != null) {
365                boolean wasRunning = session.isRunning();
366                if (wasRunning) {
367                    session.stop();
368                }
369    
370                this.messageListener.set(listener);
371                session.redispatch(this, unconsumedMessages);
372    
373                if (wasRunning) {
374                    session.start();
375                }
376            } else {
377                this.messageListener.set(null);
378            }
379        }
380    
381        public MessageAvailableListener getAvailableListener() {
382            return availableListener;
383        }
384    
385        /**
386         * Sets the listener used to notify synchronous consumers that there is a
387         * message available so that the {@link MessageConsumer#receiveNoWait()} can
388         * be called.
389         */
390        public void setAvailableListener(MessageAvailableListener availableListener) {
391            this.availableListener = availableListener;
392        }
393    
394        /**
395         * Used to get an enqueued message from the unconsumedMessages list. The
396         * amount of time this method blocks is based on the timeout value. - if
397         * timeout==-1 then it blocks until a message is received. - if timeout==0
398         * then it it tries to not block at all, it returns a message if it is
399         * available - if timeout>0 then it blocks up to timeout amount of time.
400         * Expired messages will consumed by this method.
401         * 
402         * @throws JMSException
403         * @return null if we timeout or if the consumer is closed.
404         */
405        private MessageDispatch dequeue(long timeout) throws JMSException {
406            try {
407                long deadline = 0;
408                if (timeout > 0) {
409                    deadline = System.currentTimeMillis() + timeout;
410                }
411                while (true) {
412                    MessageDispatch md = unconsumedMessages.dequeue(timeout);
413                    if (md == null) {
414                        if (timeout > 0 && !unconsumedMessages.isClosed()) {
415                            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
416                        } else {
417                            return null;
418                        }
419                    } else if (md.getMessage() == null) {
420                        return null;
421                    } else if (md.getMessage().isExpired()) {
422                        if (LOG.isDebugEnabled()) {
423                            LOG.debug(getConsumerId() + " received expired message: " + md);
424                        }
425                        beforeMessageIsConsumed(md);
426                        afterMessageIsConsumed(md, true);
427                        if (timeout > 0) {
428                            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
429                        }
430                    } else {
431                        if (LOG.isDebugEnabled()) {
432                            LOG.debug(getConsumerId() + " received message: " + md);
433                        }
434                        return md;
435                    }
436                }
437            } catch (InterruptedException e) {
438                Thread.currentThread().interrupt();
439                throw JMSExceptionSupport.create(e);
440            }
441        }
442    
443        /**
444         * Receives the next message produced for this message consumer.
445         * <P>
446         * This call blocks indefinitely until a message is produced or until this
447         * message consumer is closed.
448         * <P>
449         * If this <CODE>receive</CODE> is done within a transaction, the consumer
450         * retains the message until the transaction commits.
451         * 
452         * @return the next message produced for this message consumer, or null if
453         *         this message consumer is concurrently closed
454         */
455        public Message receive() throws JMSException {
456            checkClosed();
457            checkMessageListener();
458    
459            sendPullCommand(0);
460            MessageDispatch md = dequeue(-1);
461            if (md == null) {
462                return null;
463            }
464    
465            beforeMessageIsConsumed(md);
466            afterMessageIsConsumed(md, false);
467    
468            return createActiveMQMessage(md);
469        }
470    
471        /**
472         * @param md
473         * @return
474         */
475        private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
476            ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
477            if (transformer != null) {
478                Message transformedMessage = transformer.consumerTransform(session, this, m);
479                if (transformedMessage != null) {
480                    m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
481                }
482            }
483            if (session.isClientAcknowledge()) {
484                m.setAcknowledgeCallback(new Callback() {
485                    public void execute() throws Exception {
486                        session.checkClosed();
487                        session.acknowledge();
488                    }
489                });
490            }else if (session.isIndividualAcknowledge()) {
491                m.setAcknowledgeCallback(new Callback() {
492                    public void execute() throws Exception {
493                        session.checkClosed();
494                        acknowledge(md);
495                    }
496                });
497            }
498            return m;
499        }
500    
501        /**
502         * Receives the next message that arrives within the specified timeout
503         * interval.
504         * <P>
505         * This call blocks until a message arrives, the timeout expires, or this
506         * message consumer is closed. A <CODE>timeout</CODE> of zero never
507         * expires, and the call blocks indefinitely.
508         * 
509         * @param timeout the timeout value (in milliseconds), a time out of zero
510         *                never expires.
511         * @return the next message produced for this message consumer, or null if
512         *         the timeout expires or this message consumer is concurrently
513         *         closed
514         */
515        public Message receive(long timeout) throws JMSException {
516            checkClosed();
517            checkMessageListener();
518            if (timeout == 0) {
519                return this.receive();
520    
521            }
522    
523            sendPullCommand(timeout);
524            while (timeout > 0) {
525    
526                MessageDispatch md;
527                if (info.getPrefetchSize() == 0) {
528                    md = dequeue(-1); // We let the broker let us know when we
529                    // timeout.
530                } else {
531                    md = dequeue(timeout);
532                }
533    
534                if (md == null) {
535                    return null;
536                }
537    
538                beforeMessageIsConsumed(md);
539                afterMessageIsConsumed(md, false);
540                return createActiveMQMessage(md);
541            }
542            return null;
543        }
544    
545        /**
546         * Receives the next message if one is immediately available.
547         * 
548         * @return the next message produced for this message consumer, or null if
549         *         one is not available
550         * @throws JMSException if the JMS provider fails to receive the next
551         *                 message due to some internal error.
552         */
553        public Message receiveNoWait() throws JMSException {
554            checkClosed();
555            checkMessageListener();
556            sendPullCommand(-1);
557    
558            MessageDispatch md;
559            if (info.getPrefetchSize() == 0) {
560                md = dequeue(-1); // We let the broker let us know when we
561                // timeout.
562            } else {
563                md = dequeue(0);
564            }
565    
566            if (md == null) {
567                return null;
568            }
569    
570            beforeMessageIsConsumed(md);
571            afterMessageIsConsumed(md, false);
572            return createActiveMQMessage(md);
573        }
574    
575        /**
576         * Closes the message consumer.
577         * <P>
578         * Since a provider may allocate some resources on behalf of a <CODE>
579         * MessageConsumer</CODE>
580         * outside the Java virtual machine, clients should close them when they are
581         * not needed. Relying on garbage collection to eventually reclaim these
582         * resources may not be timely enough.
583         * <P>
584         * This call blocks until a <CODE>receive</CODE> or message listener in
585         * progress has completed. A blocked message consumer <CODE>receive </CODE>
586         * call returns null when this message consumer is closed.
587         * 
588         * @throws JMSException if the JMS provider fails to close the consumer due
589         *                 to some internal error.
590         */
591        public void close() throws JMSException {
592            if (!unconsumedMessages.isClosed()) {
593                dispose();
594                this.session.asyncSendPacket(info.createRemoveCommand());
595            }
596        }
597    
598        void clearMessagesInProgress() {
599            // we are called from inside the transport reconnection logic
600            // which involves us clearing all the connections' consumers
601            // dispatch lists and clearing them
602            // so rather than trying to grab a mutex (which could be already
603            // owned by the message listener calling the send) we will just set
604            // a flag so that the list can be cleared as soon as the
605            // dispatch thread is ready to flush the dispatch list
606            clearDispatchList = true;
607        }
608    
609        void deliverAcks() {
610            MessageAck ack = null;
611            if (deliveryingAcknowledgements.compareAndSet(false, true)) {
612                if (this.optimizeAcknowledge) {
613                    synchronized(deliveredMessages) {
614                            ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
615                            if (ack != null) {
616                                    deliveredMessages.clear();
617                                    ackCounter = 0;
618                            }
619                    }
620                } else {
621                    ack = pendingAck;
622                }
623                if (ack != null) {
624                    final MessageAck ackToSend = ack;
625                    if (executorService == null) {
626                        executorService = Executors.newSingleThreadExecutor();
627                    }
628                    executorService.submit(new Runnable() {
629                        public void run() {
630                            try {
631                                session.sendAck(ackToSend,true);
632                            } catch (JMSException e) {
633                                LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
634                            } finally {
635                                deliveryingAcknowledgements.set(false);
636                            }
637                        }
638                    });
639                } else {
640                    deliveryingAcknowledgements.set(false);
641                }
642            }
643        }
644    
645        public void dispose() throws JMSException {
646            if (!unconsumedMessages.isClosed()) {
647                
648    //            if ( !deliveredMessages.isEmpty() ) {
649    //                // We need to let the broker know how many times that message
650    //                // was rolled back.
651    //                rollbackCounter++;
652    //                MessageDispatch lastMd = deliveredMessages.get(0);
653    //            }
654    
655                // Do we have any acks we need to send out before closing?
656                // Ack any delivered messages now. (session may still
657                // commit/rollback the acks).
658                // only processes optimized acknowledgements
659                deliverAcks();
660                if (executorService != null) {
661                    executorService.shutdown();
662                    try {
663                        executorService.awaitTermination(60, TimeUnit.SECONDS);
664                    } catch (InterruptedException e) {
665                        Thread.currentThread().interrupt();
666                    }
667                }
668                if (session.isTransacted() || session.isDupsOkAcknowledge()) {
669                    acknowledge();
670                }
671                if (session.isClientAcknowledge()) {
672                    if (!this.info.isBrowser()) {
673                        // rollback duplicates that aren't acknowledged
674                        for (MessageDispatch old : deliveredMessages) {
675                            session.connection.rollbackDuplicate(this, old.getMessage());
676                        }
677                    }
678                }
679                synchronized(deliveredMessages) {
680                    deliveredMessages.clear();
681                }
682                List<MessageDispatch> list = unconsumedMessages.removeAll();
683                if (!this.info.isBrowser()) {
684                    for (MessageDispatch old : list) {
685                        // ensure we don't filter this as a duplicate
686                        session.connection.rollbackDuplicate(this, old.getMessage());
687                    }
688                }
689                unconsumedMessages.close();
690                this.session.removeConsumer(this);
691            }
692        }
693    
694        /**
695         * @throws IllegalStateException
696         */
697        protected void checkClosed() throws IllegalStateException {
698            if (unconsumedMessages.isClosed()) {
699                throw new IllegalStateException("The Consumer is closed");
700            }
701        }
702    
703        /**
704         * If we have a zero prefetch specified then send a pull command to the
705         * broker to pull a message we are about to receive
706         */
707        protected void sendPullCommand(long timeout) throws JMSException {
708            if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
709                MessagePull messagePull = new MessagePull();
710                messagePull.configure(info);
711                messagePull.setTimeout(timeout);
712                session.asyncSendPacket(messagePull);
713            }
714        }
715    
716        protected void checkMessageListener() throws JMSException {
717            session.checkMessageListener();
718        }
719    
720        protected void setOptimizeAcknowledge(boolean value) {
721            if (optimizeAcknowledge && !value) {
722                deliverAcks();
723            }
724            optimizeAcknowledge = value;
725        }
726    
727        protected void setPrefetchSize(int prefetch) {
728            deliverAcks();
729            this.info.setCurrentPrefetchSize(prefetch);
730        }
731    
732        private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
733            md.setDeliverySequenceId(session.getNextDeliveryId());
734            if (!session.isDupsOkAcknowledge()) {
735                synchronized(deliveredMessages) {
736                    deliveredMessages.addFirst(md);
737                }
738                if (session.isTransacted()) {
739                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
740                }
741            }
742        }
743    
744        private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
745            if (unconsumedMessages.isClosed()) {
746                return;
747            }
748            if (messageExpired) {
749                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
750            } else {
751                stats.onMessage();
752                if (session.isTransacted()) {
753                    // Do nothing.
754                } else if (session.isAutoAcknowledge()) {
755                    synchronized (deliveredMessages) {
756                        if (!deliveredMessages.isEmpty()) {
757                            if (optimizeAcknowledge) {
758                                if (deliveryingAcknowledgements.compareAndSet(
759                                        false, true)) {
760                                    ackCounter++;
761                                    if (ackCounter >= (info
762                                            .getCurrentPrefetchSize() * .65)) {
763                                            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
764                                            if (ack != null) {
765                                                deliveredMessages.clear();
766                                                ackCounter = 0;
767                                                session.sendAck(ack);
768                                            }
769                                    }
770                                    deliveryingAcknowledgements.set(false);
771                                }
772                            } else {
773                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
774                                if (ack!=null) {
775                                    deliveredMessages.clear();
776                                    session.sendAck(ack);
777                                }
778                            }
779                        }
780                    }
781                } else if (session.isDupsOkAcknowledge()) {
782                    ackLater(md, MessageAck.STANDARD_ACK_TYPE);
783                } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
784                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
785                } 
786                else {
787                    throw new IllegalStateException("Invalid session state.");
788                }
789            }
790        }
791    
792        /**
793         * Creates a MessageAck for all messages contained in deliveredMessages.
794         * Caller should hold the lock for deliveredMessages.
795         * 
796         * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 
797         * @return <code>null</code> if nothing to ack.
798         */
799            private MessageAck makeAckForAllDeliveredMessages(byte type) {
800                    synchronized (deliveredMessages) {
801                            if (deliveredMessages.isEmpty())
802                                    return null;
803                                
804                            MessageDispatch md = deliveredMessages.getFirst();
805                        MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
806                        ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
807                        return ack;
808                    }
809            }
810    
811        private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
812    
813            // Don't acknowledge now, but we may need to let the broker know the
814            // consumer got the message
815            // to expand the pre-fetch window
816            if (session.isTransacted()) {
817                session.doStartTransaction();
818                if (!synchronizationRegistered) {
819                    synchronizationRegistered = true;
820                    session.getTransactionContext().addSynchronization(new Synchronization() {
821                        public void beforeEnd() throws Exception {
822                            acknowledge();
823                            synchronizationRegistered = false;
824                        }
825    
826                        public void afterCommit() throws Exception {
827                            commit();
828                            synchronizationRegistered = false;
829                        }
830    
831                        public void afterRollback() throws Exception {
832                            rollback();
833                            synchronizationRegistered = false;
834                        }
835                    });
836                }
837            }
838    
839            // The delivered message list is only needed for the recover method
840            // which is only used with client ack.
841            deliveredCounter++;
842            
843            MessageAck oldPendingAck = pendingAck;
844            pendingAck = new MessageAck(md, ackType, deliveredCounter);
845            if( oldPendingAck==null ) {
846                pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
847            } else {
848                pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
849            }
850            pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
851    
852            if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
853                session.sendAck(pendingAck);
854                pendingAck=null;
855                additionalWindowSize = deliveredCounter;
856    
857                // When using DUPS ok, we do a real ack.
858                if (ackType == MessageAck.STANDARD_ACK_TYPE) {
859                    deliveredCounter = 0;
860                    additionalWindowSize = 0;
861                }
862            }
863        }
864    
865        /**
866         * Acknowledge all the messages that have been delivered to the client upto
867         * this point.
868         * 
869         * @throws JMSException
870         */
871        public void acknowledge() throws JMSException {
872            synchronized(deliveredMessages) {
873                // Acknowledge all messages so far.
874                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
875                if (ack == null)
876                    return; // no msgs
877                
878                if (session.isTransacted()) {
879                    session.doStartTransaction();
880                    ack.setTransactionId(session.getTransactionContext().getTransactionId());
881                }
882                session.sendAck(ack);
883                pendingAck = null;
884        
885                // Adjust the counters
886                deliveredCounter -= deliveredMessages.size();
887                additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
888        
889                if (!session.isTransacted()) {
890                    deliveredMessages.clear();
891                }
892            }
893        }
894        
895        void acknowledge(MessageDispatch md) throws JMSException {
896            MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
897            session.sendAck(ack);
898            synchronized(deliveredMessages){
899                deliveredMessages.remove(md);
900            }
901        }
902    
903        public void commit() throws JMSException {
904            synchronized (deliveredMessages) {
905                deliveredMessages.clear();
906            }
907            redeliveryDelay = 0;
908        }
909    
910        public void rollback() throws JMSException {
911            synchronized (unconsumedMessages.getMutex()) {
912                if (optimizeAcknowledge) {
913                    // remove messages read but not acked at the broker yet through
914                    // optimizeAcknowledge
915                    if (!this.info.isBrowser()) {
916                        synchronized(deliveredMessages) {
917                            for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
918                                // ensure we don't filter this as a duplicate
919                                MessageDispatch md = deliveredMessages.removeLast();
920                                session.connection.rollbackDuplicate(this, md.getMessage());
921                            }
922                        }
923                    }
924                }
925                synchronized(deliveredMessages) {
926                    if (deliveredMessages.isEmpty()) {
927                        return;
928                    }
929        
930                    // Only increase the redlivery delay after the first redelivery..
931                    MessageDispatch lastMd = deliveredMessages.getFirst();
932                    if (lastMd.getMessage().getRedeliveryCounter() > 0) {
933                        redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
934                    }
935                    MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
936        
937                    for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
938                        MessageDispatch md = (MessageDispatch)iter.next();
939                        md.getMessage().onMessageRolledBack();
940                    }
941        
942                    if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
943                        && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
944                        // We need to NACK the messages so that they get sent to the
945                        // DLQ.
946                        // Acknowledge the last message.
947                        
948                        MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
949                                            ack.setFirstMessageId(firstMsgId);
950                        session.sendAck(ack,true);
951                        // ensure we don't filter this as a duplicate
952                        session.connection.rollbackDuplicate(this, lastMd.getMessage());
953                        // Adjust the window size.
954                        additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
955                        redeliveryDelay = 0;
956                    } else {
957                        
958                        MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
959                        ack.setFirstMessageId(firstMsgId);
960                        session.sendAck(ack,true);
961        
962                        // stop the delivery of messages.
963                        unconsumedMessages.stop();
964        
965                        for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
966                            MessageDispatch md = (MessageDispatch)iter.next();
967                            unconsumedMessages.enqueueFirst(md);
968                        }
969        
970                        if (redeliveryDelay > 0) {
971                            // Start up the delivery again a little later.
972                            Scheduler.executeAfterDelay(new Runnable() {
973                                public void run() {
974                                    try {
975                                        if (started.get()) {
976                                            start();
977                                        }
978                                    } catch (JMSException e) {
979                                        session.connection.onAsyncException(e);
980                                    }
981                                }
982                            }, redeliveryDelay);
983                        } else {
984                            start();
985                        }
986        
987                    }
988                    deliveredCounter -= deliveredMessages.size();
989                    deliveredMessages.clear();
990                }
991            }
992            if (messageListener.get() != null) {
993                session.redispatch(this, unconsumedMessages);
994            }
995        }
996    
997        public void dispatch(MessageDispatch md) {
998            MessageListener listener = this.messageListener.get();
999            try {
1000                synchronized (unconsumedMessages.getMutex()) {
1001                    if (clearDispatchList) {
1002                        // we are reconnecting so lets flush the in progress
1003                        // messages
1004                        clearDispatchList = false;
1005                        List<MessageDispatch> list = unconsumedMessages.removeAll();
1006                        if (!this.info.isBrowser()) {
1007                            for (MessageDispatch old : list) {
1008                                // ensure we don't filter this as a duplicate
1009                                session.connection.rollbackDuplicate(this, old.getMessage());
1010                            }
1011                        }
1012                    }
1013                    if (!unconsumedMessages.isClosed()) {
1014                        if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1015                            if (listener != null && unconsumedMessages.isRunning()) {
1016                                ActiveMQMessage message = createActiveMQMessage(md);
1017                                beforeMessageIsConsumed(md);
1018                                try {
1019                                    boolean expired = message.isExpired();
1020                                    if (!expired) {
1021                                        listener.onMessage(message);
1022                                    }
1023                                    afterMessageIsConsumed(md, expired);
1024                                } catch (RuntimeException e) {
1025                                    if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge() || session.isIndividualAcknowledge()) {
1026                                        // Redeliver the message
1027                                    } else {
1028                                        // Transacted or Client ack: Deliver the
1029                                        // next message.
1030                                        afterMessageIsConsumed(md, false);
1031                                    }
1032                                    LOG.error(getConsumerId() + " Exception while processing message: " + e, e);
1033                                }
1034                            } else {
1035                                unconsumedMessages.enqueue(md);
1036                                if (availableListener != null) {
1037                                    availableListener.onMessageAvailable(this);
1038                                }
1039                            }
1040                        } else {
1041                            // ignore duplicate
1042                            if (LOG.isDebugEnabled()) {
1043                                LOG.debug(getConsumerId() + " Ignoring Duplicate: " + md.getMessage());
1044                            }
1045                            ackLater(md, MessageAck.STANDARD_ACK_TYPE);
1046                        }
1047                    }
1048                }
1049                if (++dispatchedCount % 1000 == 0) {
1050                    dispatchedCount = 0;
1051                    Thread.yield();
1052                }
1053            } catch (Exception e) {
1054                session.connection.onClientInternalException(e);
1055            }
1056        }
1057    
1058        public int getMessageSize() {
1059            return unconsumedMessages.size();
1060        }
1061    
1062        public void start() throws JMSException {
1063            if (unconsumedMessages.isClosed()) {
1064                return;
1065            }
1066            started.set(true);
1067            unconsumedMessages.start();
1068            session.executor.wakeup();
1069        }
1070    
1071        public void stop() {
1072            started.set(false);
1073            unconsumedMessages.stop();
1074        }
1075    
1076        public String toString() {
1077            return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1078                   + " }";
1079        }
1080    
1081        /**
1082         * Delivers a message to the message listener.
1083         * 
1084         * @return
1085         * @throws JMSException
1086         */
1087        public boolean iterate() {
1088            MessageListener listener = this.messageListener.get();
1089            if (listener != null) {
1090                MessageDispatch md = unconsumedMessages.dequeueNoWait();
1091                if (md != null) {
1092                    try {
1093                        ActiveMQMessage message = createActiveMQMessage(md);
1094                        beforeMessageIsConsumed(md);
1095                        listener.onMessage(message);
1096                        afterMessageIsConsumed(md, false);
1097                    } catch (JMSException e) {
1098                        session.connection.onClientInternalException(e);
1099                    }
1100                    return true;
1101                }
1102            }
1103            return false;
1104        }
1105    
1106        public boolean isInUse(ActiveMQTempDestination destination) {
1107            return info.getDestination().equals(destination);
1108        }
1109    
1110    }