001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import org.apache.activemq.blob.BlobTransferPolicy;
020    import org.apache.activemq.blob.BlobUploader;
021    import org.apache.activemq.command.*;
022    import org.apache.activemq.management.JMSSessionStatsImpl;
023    import org.apache.activemq.management.StatsCapable;
024    import org.apache.activemq.management.StatsImpl;
025    import org.apache.activemq.thread.Scheduler;
026    import org.apache.activemq.transaction.Synchronization;
027    import org.apache.activemq.usage.MemoryUsage;
028    import org.apache.activemq.util.Callback;
029    import org.apache.activemq.util.LongSequenceGenerator;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    
033    import javax.jms.*;
034    import javax.jms.IllegalStateException;
035    import javax.jms.Message;
036    import java.io.File;
037    import java.io.InputStream;
038    import java.io.Serializable;
039    import java.net.URL;
040    import java.util.Collections;
041    import java.util.Iterator;
042    import java.util.List;
043    import java.util.concurrent.CopyOnWriteArrayList;
044    import java.util.concurrent.atomic.AtomicBoolean;
045    
046    /**
047     * <P>
048     * A <CODE>Session</CODE> object is a single-threaded context for producing
049     * and consuming messages. Although it may allocate provider resources outside
050     * the Java virtual machine (JVM), it is considered a lightweight JMS object.
051     * <P>
052     * A session serves several purposes:
053     * <UL>
054     * <LI>It is a factory for its message producers and consumers.
055     * <LI>It supplies provider-optimized message factories.
056     * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
057     * <CODE>TemporaryQueues</CODE>.
058     * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
059     * objects for those clients that need to dynamically manipulate
060     * provider-specific destination names.
061     * <LI>It supports a single series of transactions that combine work spanning
062     * its producers and consumers into atomic units.
063     * <LI>It defines a serial order for the messages it consumes and the messages
064     * it produces.
065     * <LI>It retains messages it consumes until they have been acknowledged.
066     * <LI>It serializes execution of message listeners registered with its message
067     * consumers.
068     * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
069     * </UL>
070     * <P>
071     * A session can create and service multiple message producers and consumers.
072     * <P>
073     * One typical use is to have a thread block on a synchronous
074     * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
075     * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
076     * <P>
077     * If a client desires to have one thread produce messages while others consume
078     * them, the client should use a separate session for its producing thread.
079     * <P>
080     * Once a connection has been started, any session with one or more registered
081     * message listeners is dedicated to the thread of control that delivers
082     * messages to it. It is erroneous for client code to use this session or any of
083     * its constituent objects from another thread of control. The only exception to
084     * this rule is the use of the session or connection <CODE>close</CODE>
085     * method.
086     * <P>
087     * It should be easy for most clients to partition their work naturally into
088     * sessions. This model allows clients to start simply and incrementally add
089     * message processing complexity as their need for concurrency grows.
090     * <P>
091     * The <CODE>close</CODE> method is the only session method that can be called
092     * while some other session method is being executed in another thread.
093     * <P>
094     * A session may be specified as transacted. Each transacted session supports a
095     * single series of transactions. Each transaction groups a set of message sends
096     * and a set of message receives into an atomic unit of work. In effect,
097     * transactions organize a session's input message stream and output message
098     * stream into series of atomic units. When a transaction commits, its atomic
099     * unit of input is acknowledged and its associated atomic unit of output is
100     * sent. If a transaction rollback is done, the transaction's sent messages are
101     * destroyed and the session's input is automatically recovered.
102     * <P>
103     * The content of a transaction's input and output units is simply those
104     * messages that have been produced and consumed within the session's current
105     * transaction.
106     * <P>
107     * A transaction is completed using either its session's <CODE>commit</CODE>
108     * method or its session's <CODE>rollback </CODE> method. The completion of a
109     * session's current transaction automatically begins the next. The result is
110     * that a transacted session always has a current transaction within which its
111     * work is done.
112     * <P>
113     * The Java Transaction Service (JTS) or some other transaction monitor may be
114     * used to combine a session's transaction with transactions on other resources
115     * (databases, other JMS sessions, etc.). Since Java distributed transactions
116     * are controlled via the Java Transaction API (JTA), use of the session's
117     * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
118     * prohibited.
119     * <P>
120     * The JMS API does not require support for JTA; however, it does define how a
121     * provider supplies this support.
122     * <P>
123     * Although it is also possible for a JMS client to handle distributed
124     * transactions directly, it is unlikely that many JMS clients will do this.
125     * Support for JTA in the JMS API is targeted at systems vendors who will be
126     * integrating the JMS API into their application server products.
127     * 
128     * @version $Revision: 1.34 $
129     * @see javax.jms.Session
130     * @see javax.jms.QueueSession
131     * @see javax.jms.TopicSession
132     * @see javax.jms.XASession
133     */
134    public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
135            
136            /**
137             * Only acknowledge an individual message - using message.acknowledge()
138             * as opposed to CLIENT_ACKNOWLEDGE which 
139             * acknowledges all messages consumed by a session at when acknowledge()
140             * is called
141             */
142            public static final int INDIVIDUAL_ACKNOWLEDGE=4;
143    
144        public static interface DeliveryListener {
145            void beforeDelivery(ActiveMQSession session, Message msg);
146    
147            void afterDelivery(ActiveMQSession session, Message msg);
148        }
149    
150        private static final Log LOG = LogFactory.getLog(ActiveMQSession.class);
151    
152        protected int acknowledgementMode;
153        protected final ActiveMQConnection connection;
154        protected final SessionInfo info;
155        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
156        protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
157        protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
158        protected final ActiveMQSessionExecutor executor = new ActiveMQSessionExecutor(this);
159        protected final AtomicBoolean started = new AtomicBoolean(false);
160    
161        protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
162        protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
163    
164        protected boolean closed;
165        protected boolean asyncDispatch;
166        protected boolean sessionAsyncDispatch;
167        protected final boolean debug;
168        protected Object sendMutex = new Object();
169    
170        private MessageListener messageListener;
171        private JMSSessionStatsImpl stats;
172        private TransactionContext transactionContext;
173        private DeliveryListener deliveryListener;
174        private MessageTransformer transformer;
175        private BlobTransferPolicy blobTransferPolicy;
176    
177        /**
178         * Construct the Session
179         * 
180         * @param connection
181         * @param sessionId
182         * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
183         *                Session.SESSION_TRANSACTED
184         * @param asyncDispatch
185         * @param sessionAsyncDispatch
186         * @throws JMSException on internal error
187         */
188        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
189            this.debug = LOG.isDebugEnabled();
190            this.connection = connection;
191            this.acknowledgementMode = acknowledgeMode;
192            this.asyncDispatch = asyncDispatch;
193            this.sessionAsyncDispatch = sessionAsyncDispatch;
194            this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
195            setTransactionContext(new TransactionContext(connection));
196            connection.addSession(this);
197            stats = new JMSSessionStatsImpl(producers, consumers);
198            this.connection.asyncSendPacket(info);
199            setTransformer(connection.getTransformer());
200            setBlobTransferPolicy(connection.getBlobTransferPolicy());
201    
202            if (connection.isStarted()) {
203                start();
204            }
205    
206        }
207    
208        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
209            this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
210        }
211    
212        /**
213         * Sets the transaction context of the session.
214         * 
215         * @param transactionContext - provides the means to control a JMS
216         *                transaction.
217         */
218        public void setTransactionContext(TransactionContext transactionContext) {
219            this.transactionContext = transactionContext;
220        }
221    
222        /**
223         * Returns the transaction context of the session.
224         * 
225         * @return transactionContext - session's transaction context.
226         */
227        public TransactionContext getTransactionContext() {
228            return transactionContext;
229        }
230    
231        /*
232         * (non-Javadoc)
233         * 
234         * @see org.apache.activemq.management.StatsCapable#getStats()
235         */
236        public StatsImpl getStats() {
237            return stats;
238        }
239    
240        /**
241         * Returns the session's statistics.
242         * 
243         * @return stats - session's statistics.
244         */
245        public JMSSessionStatsImpl getSessionStats() {
246            return stats;
247        }
248    
249        /**
250         * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
251         * object is used to send a message containing a stream of uninterpreted
252         * bytes.
253         * 
254         * @return the an ActiveMQBytesMessage
255         * @throws JMSException if the JMS provider fails to create this message due
256         *                 to some internal error.
257         */
258        public BytesMessage createBytesMessage() throws JMSException {
259            ActiveMQBytesMessage message = new ActiveMQBytesMessage();
260            configureMessage(message);
261            return message;
262        }
263    
264        /**
265         * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
266         * object is used to send a self-defining set of name-value pairs, where
267         * names are <CODE>String</CODE> objects and values are primitive values
268         * in the Java programming language.
269         * 
270         * @return an ActiveMQMapMessage
271         * @throws JMSException if the JMS provider fails to create this message due
272         *                 to some internal error.
273         */
274        public MapMessage createMapMessage() throws JMSException {
275            ActiveMQMapMessage message = new ActiveMQMapMessage();
276            configureMessage(message);
277            return message;
278        }
279    
280        /**
281         * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
282         * interface is the root interface of all JMS messages. A
283         * <CODE>Message</CODE> object holds all the standard message header
284         * information. It can be sent when a message containing only header
285         * information is sufficient.
286         * 
287         * @return an ActiveMQMessage
288         * @throws JMSException if the JMS provider fails to create this message due
289         *                 to some internal error.
290         */
291        public Message createMessage() throws JMSException {
292            ActiveMQMessage message = new ActiveMQMessage();
293            configureMessage(message);
294            return message;
295        }
296    
297        /**
298         * Creates an <CODE>ObjectMessage</CODE> object. An
299         * <CODE>ObjectMessage</CODE> object is used to send a message that
300         * contains a serializable Java object.
301         * 
302         * @return an ActiveMQObjectMessage
303         * @throws JMSException if the JMS provider fails to create this message due
304         *                 to some internal error.
305         */
306        public ObjectMessage createObjectMessage() throws JMSException {
307            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
308            configureMessage(message);
309            return message;
310        }
311    
312        /**
313         * Creates an initialized <CODE>ObjectMessage</CODE> object. An
314         * <CODE>ObjectMessage</CODE> object is used to send a message that
315         * contains a serializable Java object.
316         * 
317         * @param object the object to use to initialize this message
318         * @return an ActiveMQObjectMessage
319         * @throws JMSException if the JMS provider fails to create this message due
320         *                 to some internal error.
321         */
322        public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
323            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
324            configureMessage(message);
325            message.setObject(object);
326            return message;
327        }
328    
329        /**
330         * Creates a <CODE>StreamMessage</CODE> object. A
331         * <CODE>StreamMessage</CODE> object is used to send a self-defining
332         * stream of primitive values in the Java programming language.
333         * 
334         * @return an ActiveMQStreamMessage
335         * @throws JMSException if the JMS provider fails to create this message due
336         *                 to some internal error.
337         */
338        public StreamMessage createStreamMessage() throws JMSException {
339            ActiveMQStreamMessage message = new ActiveMQStreamMessage();
340            configureMessage(message);
341            return message;
342        }
343    
344        /**
345         * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
346         * object is used to send a message containing a <CODE>String</CODE>
347         * object.
348         * 
349         * @return an ActiveMQTextMessage
350         * @throws JMSException if the JMS provider fails to create this message due
351         *                 to some internal error.
352         */
353        public TextMessage createTextMessage() throws JMSException {
354            ActiveMQTextMessage message = new ActiveMQTextMessage();
355            configureMessage(message);
356            return message;
357        }
358    
359        /**
360         * Creates an initialized <CODE>TextMessage</CODE> object. A
361         * <CODE>TextMessage</CODE> object is used to send a message containing a
362         * <CODE>String</CODE>.
363         * 
364         * @param text the string used to initialize this message
365         * @return an ActiveMQTextMessage
366         * @throws JMSException if the JMS provider fails to create this message due
367         *                 to some internal error.
368         */
369        public TextMessage createTextMessage(String text) throws JMSException {
370            ActiveMQTextMessage message = new ActiveMQTextMessage();
371            message.setText(text);
372            configureMessage(message);
373            return message;
374        }
375    
376        /**
377         * Creates an initialized <CODE>BlobMessage</CODE> object. A
378         * <CODE>BlobMessage</CODE> object is used to send a message containing a
379         * <CODE>URL</CODE> which points to some network addressible BLOB.
380         * 
381         * @param url the network addressable URL used to pass directly to the
382         *                consumer
383         * @return a BlobMessage
384         * @throws JMSException if the JMS provider fails to create this message due
385         *                 to some internal error.
386         */
387        public BlobMessage createBlobMessage(URL url) throws JMSException {
388            return createBlobMessage(url, false);
389        }
390    
391        /**
392         * Creates an initialized <CODE>BlobMessage</CODE> object. A
393         * <CODE>BlobMessage</CODE> object is used to send a message containing a
394         * <CODE>URL</CODE> which points to some network addressible BLOB.
395         * 
396         * @param url the network addressable URL used to pass directly to the
397         *                consumer
398         * @param deletedByBroker indicates whether or not the resource is deleted
399         *                by the broker when the message is acknowledged
400         * @return a BlobMessage
401         * @throws JMSException if the JMS provider fails to create this message due
402         *                 to some internal error.
403         */
404        public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
405            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
406            configureMessage(message);
407            message.setURL(url);
408            message.setDeletedByBroker(deletedByBroker);
409            return message;
410        }
411    
412        /**
413         * Creates an initialized <CODE>BlobMessage</CODE> object. A
414         * <CODE>BlobMessage</CODE> object is used to send a message containing
415         * the <CODE>File</CODE> content. Before the message is sent the file
416         * conent will be uploaded to the broker or some other remote repository
417         * depending on the {@link #getBlobTransferPolicy()}.
418         * 
419         * @param file the file to be uploaded to some remote repo (or the broker)
420         *                depending on the strategy
421         * @return a BlobMessage
422         * @throws JMSException if the JMS provider fails to create this message due
423         *                 to some internal error.
424         */
425        public BlobMessage createBlobMessage(File file) throws JMSException {
426            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
427            configureMessage(message);
428            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
429            message.setDeletedByBroker(true);
430            message.setName(file.getName());
431            return message;
432        }
433    
434        /**
435         * Creates an initialized <CODE>BlobMessage</CODE> object. A
436         * <CODE>BlobMessage</CODE> object is used to send a message containing
437         * the <CODE>File</CODE> content. Before the message is sent the file
438         * conent will be uploaded to the broker or some other remote repository
439         * depending on the {@link #getBlobTransferPolicy()}.
440         * 
441         * @param in the stream to be uploaded to some remote repo (or the broker)
442         *                depending on the strategy
443         * @return a BlobMessage
444         * @throws JMSException if the JMS provider fails to create this message due
445         *                 to some internal error.
446         */
447        public BlobMessage createBlobMessage(InputStream in) throws JMSException {
448            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
449            configureMessage(message);
450            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
451            message.setDeletedByBroker(true);
452            return message;
453        }
454    
455        /**
456         * Indicates whether the session is in transacted mode.
457         * 
458         * @return true if the session is in transacted mode
459         * @throws JMSException if there is some internal error.
460         */
461        public boolean getTransacted() throws JMSException {
462            checkClosed();
463            return (acknowledgementMode == Session.SESSION_TRANSACTED) || (transactionContext.isInXATransaction());
464        }
465    
466        /**
467         * Returns the acknowledgement mode of the session. The acknowledgement mode
468         * is set at the time that the session is created. If the session is
469         * transacted, the acknowledgement mode is ignored.
470         * 
471         * @return If the session is not transacted, returns the current
472         *         acknowledgement mode for the session. If the session is
473         *         transacted, returns SESSION_TRANSACTED.
474         * @throws JMSException
475         * @see javax.jms.Connection#createSession(boolean,int)
476         * @since 1.1 exception JMSException if there is some internal error.
477         */
478        public int getAcknowledgeMode() throws JMSException {
479            checkClosed();
480            return this.acknowledgementMode;
481        }
482    
483        /**
484         * Commits all messages done in this transaction and releases any locks
485         * currently held.
486         * 
487         * @throws JMSException if the JMS provider fails to commit the transaction
488         *                 due to some internal error.
489         * @throws TransactionRolledBackException if the transaction is rolled back
490         *                 due to some internal error during commit.
491         * @throws javax.jms.IllegalStateException if the method is not called by a
492         *                 transacted session.
493         */
494        public void commit() throws JMSException {
495            checkClosed();
496            if (!getTransacted()) {
497                throw new javax.jms.IllegalStateException("Not a transacted session");
498            }
499            if (LOG.isDebugEnabled()) {
500                LOG.debug(getSessionId() + " Transaction Commit");
501            }
502            transactionContext.commit();
503        }
504    
505        /**
506         * Rolls back any messages done in this transaction and releases any locks
507         * currently held.
508         * 
509         * @throws JMSException if the JMS provider fails to roll back the
510         *                 transaction due to some internal error.
511         * @throws javax.jms.IllegalStateException if the method is not called by a
512         *                 transacted session.
513         */
514        public void rollback() throws JMSException {
515            checkClosed();
516            if (!getTransacted()) {
517                throw new javax.jms.IllegalStateException("Not a transacted session");
518            }
519            if (LOG.isDebugEnabled()) {
520                LOG.debug(getSessionId() + " Transaction Rollback");
521            }
522            transactionContext.rollback();
523        }
524    
525        /**
526         * Closes the session.
527         * <P>
528         * Since a provider may allocate some resources on behalf of a session
529         * outside the JVM, clients should close the resources when they are not
530         * needed. Relying on garbage collection to eventually reclaim these
531         * resources may not be timely enough.
532         * <P>
533         * There is no need to close the producers and consumers of a closed
534         * session.
535         * <P>
536         * This call will block until a <CODE>receive</CODE> call or message
537         * listener in progress has completed. A blocked message consumer
538         * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
539         * is closed.
540         * <P>
541         * Closing a transacted session must roll back the transaction in progress.
542         * <P>
543         * This method is the only <CODE>Session</CODE> method that can be called
544         * concurrently.
545         * <P>
546         * Invoking any other <CODE>Session</CODE> method on a closed session must
547         * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
548         * closed session must <I>not </I> throw an exception.
549         * 
550         * @throws JMSException if the JMS provider fails to close the session due
551         *                 to some internal error.
552         */
553        public void close() throws JMSException {
554            if (!closed) {
555                dispose();
556                connection.asyncSendPacket(info.createRemoveCommand());
557            }
558        }
559    
560        void clearMessagesInProgress() {
561            executor.clearMessagesInProgress();
562            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
563                ActiveMQMessageConsumer consumer = iter.next();
564                consumer.clearMessagesInProgress();
565            }
566        }
567    
568        void deliverAcks() {
569            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
570                ActiveMQMessageConsumer consumer = iter.next();
571                consumer.deliverAcks();
572            }
573        }
574    
575        public synchronized void dispose() throws JMSException {
576            if (!closed) {
577    
578                try {
579                    executor.stop();
580    
581                    for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
582                        ActiveMQMessageConsumer consumer = iter.next();
583                        consumer.dispose();
584                    }
585                    consumers.clear();
586    
587                    for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
588                        ActiveMQMessageProducer producer = iter.next();
589                        producer.dispose();
590                    }
591                    producers.clear();
592    
593                    try {
594                        if (getTransactionContext().isInLocalTransaction()) {
595                            rollback();
596                        }
597                    } catch (JMSException e) {
598                    }
599    
600                } finally {
601                    connection.removeSession(this);
602                    this.transactionContext = null;
603                    closed = true;
604                }
605            }
606        }
607    
608        /**
609         * Checks that the session is not closed then configures the message
610         */
611        protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
612            checkClosed();
613            message.setConnection(connection);
614        }
615    
616        /**
617         * Check if the session is closed. It is used for ensuring that the session
618         * is open before performing various operations.
619         * 
620         * @throws IllegalStateException if the Session is closed
621         */
622        protected void checkClosed() throws IllegalStateException {
623            if (closed) {
624                throw new IllegalStateException("The Session is closed");
625            }
626        }
627    
628        /**
629         * Stops message delivery in this session, and restarts message delivery
630         * with the oldest unacknowledged message.
631         * <P>
632         * All consumers deliver messages in a serial order. Acknowledging a
633         * received message automatically acknowledges all messages that have been
634         * delivered to the client.
635         * <P>
636         * Restarting a session causes it to take the following actions:
637         * <UL>
638         * <LI>Stop message delivery
639         * <LI>Mark all messages that might have been delivered but not
640         * acknowledged as "redelivered"
641         * <LI>Restart the delivery sequence including all unacknowledged messages
642         * that had been previously delivered. Redelivered messages do not have to
643         * be delivered in exactly their original delivery order.
644         * </UL>
645         * 
646         * @throws JMSException if the JMS provider fails to stop and restart
647         *                 message delivery due to some internal error.
648         * @throws IllegalStateException if the method is called by a transacted
649         *                 session.
650         */
651        public void recover() throws JMSException {
652    
653            checkClosed();
654            if (getTransacted()) {
655                throw new IllegalStateException("This session is transacted");
656            }
657    
658            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
659                ActiveMQMessageConsumer c = iter.next();
660                c.rollback();
661            }
662    
663        }
664    
665        /**
666         * Returns the session's distinguished message listener (optional).
667         * 
668         * @return the message listener associated with this session
669         * @throws JMSException if the JMS provider fails to get the message
670         *                 listener due to an internal error.
671         * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
672         * @see javax.jms.ServerSessionPool
673         * @see javax.jms.ServerSession
674         */
675        public MessageListener getMessageListener() throws JMSException {
676            checkClosed();
677            return this.messageListener;
678        }
679    
680        /**
681         * Sets the session's distinguished message listener (optional).
682         * <P>
683         * When the distinguished message listener is set, no other form of message
684         * receipt in the session can be used; however, all forms of sending
685         * messages are still supported.
686         * <P>
687         * This is an expert facility not used by regular JMS clients.
688         * 
689         * @param listener the message listener to associate with this session
690         * @throws JMSException if the JMS provider fails to set the message
691         *                 listener due to an internal error.
692         * @see javax.jms.Session#getMessageListener()
693         * @see javax.jms.ServerSessionPool
694         * @see javax.jms.ServerSession
695         */
696        public void setMessageListener(MessageListener listener) throws JMSException {
697            checkClosed();
698            this.messageListener = listener;
699    
700            if (listener != null) {
701                executor.setDispatchedBySessionPool(true);
702            }
703        }
704    
705        /**
706         * Optional operation, intended to be used only by Application Servers, not
707         * by ordinary JMS clients.
708         * 
709         * @see javax.jms.ServerSession
710         */
711        public void run() {
712            MessageDispatch messageDispatch;
713            while ((messageDispatch = executor.dequeueNoWait()) != null) {
714                final MessageDispatch md = messageDispatch;
715                ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
716                if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
717                    // TODO: Ack it without delivery to client
718                    continue;
719                }
720    
721                if (isClientAcknowledge()||isIndividualAcknowledge()) {
722                    message.setAcknowledgeCallback(new Callback() {
723                        public void execute() throws Exception {
724                        }
725                    });
726                }
727    
728                if (deliveryListener != null) {
729                    deliveryListener.beforeDelivery(this, message);
730                }
731    
732                md.setDeliverySequenceId(getNextDeliveryId());
733    
734                try {
735                    messageListener.onMessage(message);
736                } catch (RuntimeException e) {
737                    LOG.error("error dispatching message: ", e);
738                    // A problem while invoking the MessageListener does not
739                    // in general indicate a problem with the connection to the broker, i.e.
740                    // it will usually be sufficient to let the afterDelivery() method either
741                    // commit or roll back in order to deal with the exception.
742                    // However, we notify any registered client internal exception listener
743                    // of the problem.
744                    connection.onClientInternalException(e);
745                }
746    
747                try {
748                    MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
749                    ack.setFirstMessageId(md.getMessage().getMessageId());
750                    doStartTransaction();
751                    ack.setTransactionId(getTransactionContext().getTransactionId());
752                    if (ack.getTransactionId() != null) {
753                        getTransactionContext().addSynchronization(new Synchronization() {
754    
755                            public void afterRollback() throws Exception {
756                                md.getMessage().onMessageRolledBack();
757                                // ensure we don't filter this as a duplicate
758                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
759                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
760                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
761                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
762                                    && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
763                                    // We need to NACK the messages so that they get
764                                    // sent to the
765                                    // DLQ.
766                                    // Acknowledge the last message.
767                                    MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
768                                    ack.setFirstMessageId(md.getMessage().getMessageId());
769                                    asyncSendPacket(ack);
770                                } else {
771                                    
772                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
773                                    ack.setFirstMessageId(md.getMessage().getMessageId());
774                                    asyncSendPacket(ack);
775    
776                                    // Figure out how long we should wait to resend
777                                    // this message.
778                                    long redeliveryDelay = 0;
779                                    for (int i = 0; i < redeliveryCounter; i++) {
780                                        redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
781                                    }
782                                    Scheduler.executeAfterDelay(new Runnable() {
783    
784                                        public void run() {
785                                            ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
786                                        }
787                                    }, redeliveryDelay);
788                                }
789                            }
790                        });
791                    }
792                    asyncSendPacket(ack);
793                } catch (Throwable e) {
794                    connection.onClientInternalException(e);
795                }
796    
797                if (deliveryListener != null) {
798                    deliveryListener.afterDelivery(this, message);
799                }
800            }
801        }
802    
803        /**
804         * Creates a <CODE>MessageProducer</CODE> to send messages to the
805         * specified destination.
806         * <P>
807         * A client uses a <CODE>MessageProducer</CODE> object to send messages to
808         * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
809         * inherit from <CODE>Destination</CODE>, they can be used in the
810         * destination parameter to create a <CODE>MessageProducer</CODE> object.
811         * 
812         * @param destination the <CODE>Destination</CODE> to send to, or null if
813         *                this is a producer which does not have a specified
814         *                destination.
815         * @return the MessageProducer
816         * @throws JMSException if the session fails to create a MessageProducer due
817         *                 to some internal error.
818         * @throws InvalidDestinationException if an invalid destination is
819         *                 specified.
820         * @since 1.1
821         */
822        public MessageProducer createProducer(Destination destination) throws JMSException {
823            checkClosed();
824            if (destination instanceof CustomDestination) {
825                CustomDestination customDestination = (CustomDestination)destination;
826                return customDestination.createProducer(this);
827            }
828            int timeSendOut = connection.getSendTimeout();
829            return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
830        }
831    
832        /**
833         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
834         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
835         * <CODE>Destination</CODE>, they can be used in the destination
836         * parameter to create a <CODE>MessageConsumer</CODE>.
837         * 
838         * @param destination the <CODE>Destination</CODE> to access.
839         * @return the MessageConsumer
840         * @throws JMSException if the session fails to create a consumer due to
841         *                 some internal error.
842         * @throws InvalidDestinationException if an invalid destination is
843         *                 specified.
844         * @since 1.1
845         */
846        public MessageConsumer createConsumer(Destination destination) throws JMSException {
847            return createConsumer(destination, (String) null);
848        }
849    
850        /**
851         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
852         * using a message selector. Since <CODE> Queue</CODE> and
853         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
854         * can be used in the destination parameter to create a
855         * <CODE>MessageConsumer</CODE>.
856         * <P>
857         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
858         * that have been sent to a destination.
859         * 
860         * @param destination the <CODE>Destination</CODE> to access
861         * @param messageSelector only messages with properties matching the message
862         *                selector expression are delivered. A value of null or an
863         *                empty string indicates that there is no message selector
864         *                for the message consumer.
865         * @return the MessageConsumer
866         * @throws JMSException if the session fails to create a MessageConsumer due
867         *                 to some internal error.
868         * @throws InvalidDestinationException if an invalid destination is
869         *                 specified.
870         * @throws InvalidSelectorException if the message selector is invalid.
871         * @since 1.1
872         */
873        public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
874            return createConsumer(destination, messageSelector, false);
875        }
876    
877        /**
878         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
879         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
880         * <CODE>Destination</CODE>, they can be used in the destination
881         * parameter to create a <CODE>MessageConsumer</CODE>.
882         *
883         * @param destination the <CODE>Destination</CODE> to access.
884         * @param messageListener the listener to use for async consumption of messages
885         * @return the MessageConsumer
886         * @throws JMSException if the session fails to create a consumer due to
887         *                 some internal error.
888         * @throws InvalidDestinationException if an invalid destination is
889         *                 specified.
890         * @since 1.1
891         */
892        public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
893            return createConsumer(destination, null, messageListener);
894        }
895    
896        /**
897         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
898         * using a message selector. Since <CODE> Queue</CODE> and
899         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
900         * can be used in the destination parameter to create a
901         * <CODE>MessageConsumer</CODE>.
902         * <P>
903         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
904         * that have been sent to a destination.
905         *
906         * @param destination the <CODE>Destination</CODE> to access
907         * @param messageSelector only messages with properties matching the message
908         *                selector expression are delivered. A value of null or an
909         *                empty string indicates that there is no message selector
910         *                for the message consumer.
911         * @param messageListener the listener to use for async consumption of messages
912         * @return the MessageConsumer
913         * @throws JMSException if the session fails to create a MessageConsumer due
914         *                 to some internal error.
915         * @throws InvalidDestinationException if an invalid destination is
916         *                 specified.
917         * @throws InvalidSelectorException if the message selector is invalid.
918         * @since 1.1
919         */
920        public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
921            return createConsumer(destination, messageSelector, false, messageListener);
922        }
923    
924        /**
925         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
926         * using a message selector. This method can specify whether messages
927         * published by its own connection should be delivered to it, if the
928         * destination is a topic.
929         * <P>
930         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
931         * <CODE>Destination</CODE>, they can be used in the destination
932         * parameter to create a <CODE>MessageConsumer</CODE>.
933         * <P>
934         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
935         * that have been published to a destination.
936         * <P>
937         * In some cases, a connection may both publish and subscribe to a topic.
938         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
939         * inhibit the delivery of messages published by its own connection. The
940         * default value for this attribute is False. The <CODE>noLocal</CODE>
941         * value must be supported by destinations that are topics.
942         * 
943         * @param destination the <CODE>Destination</CODE> to access
944         * @param messageSelector only messages with properties matching the message
945         *                selector expression are delivered. A value of null or an
946         *                empty string indicates that there is no message selector
947         *                for the message consumer.
948         * @param noLocal - if true, and the destination is a topic, inhibits the
949         *                delivery of messages published by its own connection. The
950         *                behavior for <CODE>NoLocal</CODE> is not specified if
951         *                the destination is a queue.
952         * @return the MessageConsumer
953         * @throws JMSException if the session fails to create a MessageConsumer due
954         *                 to some internal error.
955         * @throws InvalidDestinationException if an invalid destination is
956         *                 specified.
957         * @throws InvalidSelectorException if the message selector is invalid.
958         * @since 1.1
959         */
960        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
961            return createConsumer(destination, messageSelector, noLocal, null);
962        }
963    
964        /**
965         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
966         * using a message selector. This method can specify whether messages
967         * published by its own connection should be delivered to it, if the
968         * destination is a topic.
969         * <P>
970         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
971         * <CODE>Destination</CODE>, they can be used in the destination
972         * parameter to create a <CODE>MessageConsumer</CODE>.
973         * <P>
974         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
975         * that have been published to a destination.
976         * <P>
977         * In some cases, a connection may both publish and subscribe to a topic.
978         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
979         * inhibit the delivery of messages published by its own connection. The
980         * default value for this attribute is False. The <CODE>noLocal</CODE>
981         * value must be supported by destinations that are topics.
982         *
983         * @param destination the <CODE>Destination</CODE> to access
984         * @param messageSelector only messages with properties matching the message
985         *                selector expression are delivered. A value of null or an
986         *                empty string indicates that there is no message selector
987         *                for the message consumer.
988         * @param noLocal - if true, and the destination is a topic, inhibits the
989         *                delivery of messages published by its own connection. The
990         *                behavior for <CODE>NoLocal</CODE> is not specified if
991         *                the destination is a queue.
992         * @param messageListener the listener to use for async consumption of messages
993         * @return the MessageConsumer
994         * @throws JMSException if the session fails to create a MessageConsumer due
995         *                 to some internal error.
996         * @throws InvalidDestinationException if an invalid destination is
997         *                 specified.
998         * @throws InvalidSelectorException if the message selector is invalid.
999         * @since 1.1
1000         */
1001        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1002            checkClosed();
1003    
1004            if (destination instanceof CustomDestination) {
1005                CustomDestination customDestination = (CustomDestination)destination;
1006                return customDestination.createConsumer(this, messageSelector, noLocal);
1007            }
1008    
1009            ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1010            int prefetch = 0;
1011            if (destination instanceof Topic) {
1012                prefetch = prefetchPolicy.getTopicPrefetch();
1013            } else {
1014                prefetch = prefetchPolicy.getQueuePrefetch();
1015            }
1016            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1017            return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1018                    prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1019        }
1020    
1021        /**
1022         * Creates a queue identity given a <CODE>Queue</CODE> name.
1023         * <P>
1024         * This facility is provided for the rare cases where clients need to
1025         * dynamically manipulate queue identity. It allows the creation of a queue
1026         * identity with a provider-specific name. Clients that depend on this
1027         * ability are not portable.
1028         * <P>
1029         * Note that this method is not for creating the physical queue. The
1030         * physical creation of queues is an administrative task and is not to be
1031         * initiated by the JMS API. The one exception is the creation of temporary
1032         * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1033         * method.
1034         * 
1035         * @param queueName the name of this <CODE>Queue</CODE>
1036         * @return a <CODE>Queue</CODE> with the given name
1037         * @throws JMSException if the session fails to create a queue due to some
1038         *                 internal error.
1039         * @since 1.1
1040         */
1041        public Queue createQueue(String queueName) throws JMSException {
1042            checkClosed();
1043            if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1044                return new ActiveMQTempQueue(queueName);
1045            }
1046            return new ActiveMQQueue(queueName);
1047        }
1048    
1049        /**
1050         * Creates a topic identity given a <CODE>Topic</CODE> name.
1051         * <P>
1052         * This facility is provided for the rare cases where clients need to
1053         * dynamically manipulate topic identity. This allows the creation of a
1054         * topic identity with a provider-specific name. Clients that depend on this
1055         * ability are not portable.
1056         * <P>
1057         * Note that this method is not for creating the physical topic. The
1058         * physical creation of topics is an administrative task and is not to be
1059         * initiated by the JMS API. The one exception is the creation of temporary
1060         * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1061         * method.
1062         * 
1063         * @param topicName the name of this <CODE>Topic</CODE>
1064         * @return a <CODE>Topic</CODE> with the given name
1065         * @throws JMSException if the session fails to create a topic due to some
1066         *                 internal error.
1067         * @since 1.1
1068         */
1069        public Topic createTopic(String topicName) throws JMSException {
1070            checkClosed();
1071            if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1072                return new ActiveMQTempTopic(topicName);
1073            }
1074            return new ActiveMQTopic(topicName);
1075        }
1076    
1077        /**
1078         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1079         * the specified queue.
1080         * 
1081         * @param queue the <CODE>queue</CODE> to access
1082         * @exception InvalidDestinationException if an invalid destination is
1083         *                    specified
1084         * @since 1.1
1085         */
1086        /**
1087         * Creates a durable subscriber to the specified topic.
1088         * <P>
1089         * If a client needs to receive all the messages published on a topic,
1090         * including the ones published while the subscriber is inactive, it uses a
1091         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1092         * record of this durable subscription and insures that all messages from
1093         * the topic's publishers are retained until they are acknowledged by this
1094         * durable subscriber or they have expired.
1095         * <P>
1096         * Sessions with durable subscribers must always provide the same client
1097         * identifier. In addition, each client must specify a name that uniquely
1098         * identifies (within client identifier) each durable subscription it
1099         * creates. Only one session at a time can have a
1100         * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1101         * <P>
1102         * A client can change an existing durable subscription by creating a
1103         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1104         * and/or message selector. Changing a durable subscriber is equivalent to
1105         * unsubscribing (deleting) the old one and creating a new one.
1106         * <P>
1107         * In some cases, a connection may both publish and subscribe to a topic.
1108         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1109         * inhibit the delivery of messages published by its own connection. The
1110         * default value for this attribute is false.
1111         * 
1112         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1113         * @param name the name used to identify this subscription
1114         * @return the TopicSubscriber
1115         * @throws JMSException if the session fails to create a subscriber due to
1116         *                 some internal error.
1117         * @throws InvalidDestinationException if an invalid topic is specified.
1118         * @since 1.1
1119         */
1120        public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1121            checkClosed();
1122            return createDurableSubscriber(topic, name, null, false);
1123        }
1124    
1125        /**
1126         * Creates a durable subscriber to the specified topic, using a message
1127         * selector and specifying whether messages published by its own connection
1128         * should be delivered to it.
1129         * <P>
1130         * If a client needs to receive all the messages published on a topic,
1131         * including the ones published while the subscriber is inactive, it uses a
1132         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1133         * record of this durable subscription and insures that all messages from
1134         * the topic's publishers are retained until they are acknowledged by this
1135         * durable subscriber or they have expired.
1136         * <P>
1137         * Sessions with durable subscribers must always provide the same client
1138         * identifier. In addition, each client must specify a name which uniquely
1139         * identifies (within client identifier) each durable subscription it
1140         * creates. Only one session at a time can have a
1141         * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1142         * inactive durable subscriber is one that exists but does not currently
1143         * have a message consumer associated with it.
1144         * <P>
1145         * A client can change an existing durable subscription by creating a
1146         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1147         * and/or message selector. Changing a durable subscriber is equivalent to
1148         * unsubscribing (deleting) the old one and creating a new one.
1149         * 
1150         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1151         * @param name the name used to identify this subscription
1152         * @param messageSelector only messages with properties matching the message
1153         *                selector expression are delivered. A value of null or an
1154         *                empty string indicates that there is no message selector
1155         *                for the message consumer.
1156         * @param noLocal if set, inhibits the delivery of messages published by its
1157         *                own connection
1158         * @return the Queue Browser
1159         * @throws JMSException if the session fails to create a subscriber due to
1160         *                 some internal error.
1161         * @throws InvalidDestinationException if an invalid topic is specified.
1162         * @throws InvalidSelectorException if the message selector is invalid.
1163         * @since 1.1
1164         */
1165        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1166            checkClosed();
1167    
1168            if (topic instanceof CustomDestination) {
1169                CustomDestination customDestination = (CustomDestination)topic;
1170                return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1171            }
1172    
1173            connection.checkClientIDWasManuallySpecified();
1174            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1175            int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1176            int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1177            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1178                                               noLocal, false, asyncDispatch);
1179        }
1180    
1181        /**
1182         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1183         * the specified queue.
1184         * 
1185         * @param queue the <CODE>queue</CODE> to access
1186         * @return the Queue Browser
1187         * @throws JMSException if the session fails to create a browser due to some
1188         *                 internal error.
1189         * @throws InvalidDestinationException if an invalid destination is
1190         *                 specified
1191         * @since 1.1
1192         */
1193        public QueueBrowser createBrowser(Queue queue) throws JMSException {
1194            checkClosed();
1195            return createBrowser(queue, null);
1196        }
1197    
1198        /**
1199         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1200         * the specified queue using a message selector.
1201         * 
1202         * @param queue the <CODE>queue</CODE> to access
1203         * @param messageSelector only messages with properties matching the message
1204         *                selector expression are delivered. A value of null or an
1205         *                empty string indicates that there is no message selector
1206         *                for the message consumer.
1207         * @return the Queue Browser
1208         * @throws JMSException if the session fails to create a browser due to some
1209         *                 internal error.
1210         * @throws InvalidDestinationException if an invalid destination is
1211         *                 specified
1212         * @throws InvalidSelectorException if the message selector is invalid.
1213         * @since 1.1
1214         */
1215        public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1216            checkClosed();
1217            return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1218        }
1219    
1220        /**
1221         * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1222         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1223         * 
1224         * @return a temporary queue identity
1225         * @throws JMSException if the session fails to create a temporary queue due
1226         *                 to some internal error.
1227         * @since 1.1
1228         */
1229        public TemporaryQueue createTemporaryQueue() throws JMSException {
1230            checkClosed();
1231            return (TemporaryQueue)connection.createTempDestination(false);
1232        }
1233    
1234        /**
1235         * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1236         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1237         * 
1238         * @return a temporary topic identity
1239         * @throws JMSException if the session fails to create a temporary topic due
1240         *                 to some internal error.
1241         * @since 1.1
1242         */
1243        public TemporaryTopic createTemporaryTopic() throws JMSException {
1244            checkClosed();
1245            return (TemporaryTopic)connection.createTempDestination(true);
1246        }
1247    
1248        /**
1249         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1250         * the specified queue.
1251         * 
1252         * @param queue the <CODE>Queue</CODE> to access
1253         * @return
1254         * @throws JMSException if the session fails to create a receiver due to
1255         *                 some internal error.
1256         * @throws JMSException
1257         * @throws InvalidDestinationException if an invalid queue is specified.
1258         */
1259        public QueueReceiver createReceiver(Queue queue) throws JMSException {
1260            checkClosed();
1261            return createReceiver(queue, null);
1262        }
1263    
1264        /**
1265         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1266         * the specified queue using a message selector.
1267         * 
1268         * @param queue the <CODE>Queue</CODE> to access
1269         * @param messageSelector only messages with properties matching the message
1270         *                selector expression are delivered. A value of null or an
1271         *                empty string indicates that there is no message selector
1272         *                for the message consumer.
1273         * @return QueueReceiver
1274         * @throws JMSException if the session fails to create a receiver due to
1275         *                 some internal error.
1276         * @throws InvalidDestinationException if an invalid queue is specified.
1277         * @throws InvalidSelectorException if the message selector is invalid.
1278         */
1279        public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1280            checkClosed();
1281    
1282            if (queue instanceof CustomDestination) {
1283                CustomDestination customDestination = (CustomDestination)queue;
1284                return customDestination.createReceiver(this, messageSelector);
1285            }
1286    
1287            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1288            return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1289                                             prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1290        }
1291    
1292        /**
1293         * Creates a <CODE>QueueSender</CODE> object to send messages to the
1294         * specified queue.
1295         * 
1296         * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1297         *                unidentified producer
1298         * @return QueueSender
1299         * @throws JMSException if the session fails to create a sender due to some
1300         *                 internal error.
1301         * @throws InvalidDestinationException if an invalid queue is specified.
1302         */
1303        public QueueSender createSender(Queue queue) throws JMSException {
1304            checkClosed();
1305            if (queue instanceof CustomDestination) {
1306                CustomDestination customDestination = (CustomDestination)queue;
1307                return customDestination.createSender(this);
1308            }
1309            int timeSendOut = connection.getSendTimeout();
1310            return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1311        }
1312    
1313        /**
1314         * Creates a nondurable subscriber to the specified topic. <p/>
1315         * <P>
1316         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1317         * that have been published to a topic. <p/>
1318         * <P>
1319         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1320         * receive only messages that are published while they are active. <p/>
1321         * <P>
1322         * In some cases, a connection may both publish and subscribe to a topic.
1323         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1324         * inhibit the delivery of messages published by its own connection. The
1325         * default value for this attribute is false.
1326         * 
1327         * @param topic the <CODE>Topic</CODE> to subscribe to
1328         * @return TopicSubscriber
1329         * @throws JMSException if the session fails to create a subscriber due to
1330         *                 some internal error.
1331         * @throws InvalidDestinationException if an invalid topic is specified.
1332         */
1333        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1334            checkClosed();
1335            return createSubscriber(topic, null, false);
1336        }
1337    
1338        /**
1339         * Creates a nondurable subscriber to the specified topic, using a message
1340         * selector or specifying whether messages published by its own connection
1341         * should be delivered to it. <p/>
1342         * <P>
1343         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1344         * that have been published to a topic. <p/>
1345         * <P>
1346         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1347         * receive only messages that are published while they are active. <p/>
1348         * <P>
1349         * Messages filtered out by a subscriber's message selector will never be
1350         * delivered to the subscriber. From the subscriber's perspective, they do
1351         * not exist. <p/>
1352         * <P>
1353         * In some cases, a connection may both publish and subscribe to a topic.
1354         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1355         * inhibit the delivery of messages published by its own connection. The
1356         * default value for this attribute is false.
1357         * 
1358         * @param topic the <CODE>Topic</CODE> to subscribe to
1359         * @param messageSelector only messages with properties matching the message
1360         *                selector expression are delivered. A value of null or an
1361         *                empty string indicates that there is no message selector
1362         *                for the message consumer.
1363         * @param noLocal if set, inhibits the delivery of messages published by its
1364         *                own connection
1365         * @return TopicSubscriber
1366         * @throws JMSException if the session fails to create a subscriber due to
1367         *                 some internal error.
1368         * @throws InvalidDestinationException if an invalid topic is specified.
1369         * @throws InvalidSelectorException if the message selector is invalid.
1370         */
1371        public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1372            checkClosed();
1373    
1374            if (topic instanceof CustomDestination) {
1375                CustomDestination customDestination = (CustomDestination)topic;
1376                return customDestination.createSubscriber(this, messageSelector, noLocal);
1377            }
1378    
1379            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1380            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1381                .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1382        }
1383    
1384        /**
1385         * Creates a publisher for the specified topic. <p/>
1386         * <P>
1387         * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1388         * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1389         * a topic, it defines a new sequence of messages that have no ordering
1390         * relationship with the messages it has previously sent.
1391         * 
1392         * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1393         *                an unidentified producer
1394         * @return TopicPublisher
1395         * @throws JMSException if the session fails to create a publisher due to
1396         *                 some internal error.
1397         * @throws InvalidDestinationException if an invalid topic is specified.
1398         */
1399        public TopicPublisher createPublisher(Topic topic) throws JMSException {
1400            checkClosed();
1401    
1402            if (topic instanceof CustomDestination) {
1403                CustomDestination customDestination = (CustomDestination)topic;
1404                return customDestination.createPublisher(this);
1405            }
1406            int timeSendOut = connection.getSendTimeout();
1407            return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1408        }
1409    
1410        /**
1411         * Unsubscribes a durable subscription that has been created by a client.
1412         * <P>
1413         * This method deletes the state being maintained on behalf of the
1414         * subscriber by its provider.
1415         * <P>
1416         * It is erroneous for a client to delete a durable subscription while there
1417         * is an active <CODE>MessageConsumer </CODE> or
1418         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1419         * message is part of a pending transaction or has not been acknowledged in
1420         * the session.
1421         * 
1422         * @param name the name used to identify this subscription
1423         * @throws JMSException if the session fails to unsubscribe to the durable
1424         *                 subscription due to some internal error.
1425         * @throws InvalidDestinationException if an invalid subscription name is
1426         *                 specified.
1427         * @since 1.1
1428         */
1429        public void unsubscribe(String name) throws JMSException {
1430            checkClosed();
1431            connection.unsubscribe(name);
1432        }
1433    
1434        public void dispatch(MessageDispatch messageDispatch) {
1435            try {
1436                executor.execute(messageDispatch);
1437            } catch (InterruptedException e) {
1438                Thread.currentThread().interrupt();
1439                connection.onClientInternalException(e);
1440            }
1441        }
1442    
1443        /**
1444         * Acknowledges all consumed messages of the session of this consumed
1445         * message.
1446         * <P>
1447         * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1448         * for use when a client has specified that its JMS session's consumed
1449         * messages are to be explicitly acknowledged. By invoking
1450         * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1451         * all messages consumed by the session that the message was delivered to.
1452         * <P>
1453         * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1454         * sessions and sessions specified to use implicit acknowledgement modes.
1455         * <P>
1456         * A client may individually acknowledge each message as it is consumed, or
1457         * it may choose to acknowledge messages as an application-defined group
1458         * (which is done by calling acknowledge on the last received message of the
1459         * group, thereby acknowledging all messages consumed by the session.)
1460         * <P>
1461         * Messages that have been received but not acknowledged may be redelivered.
1462         * 
1463         * @throws JMSException if the JMS provider fails to acknowledge the
1464         *                 messages due to some internal error.
1465         * @throws javax.jms.IllegalStateException if this method is called on a
1466         *                 closed session.
1467         * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1468         */
1469        public void acknowledge() throws JMSException {
1470            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1471                ActiveMQMessageConsumer c = iter.next();
1472                c.acknowledge();
1473            }
1474        }
1475    
1476        /**
1477         * Add a message consumer.
1478         * 
1479         * @param consumer - message consumer.
1480         * @throws JMSException
1481         */
1482        protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1483            this.consumers.add(consumer);
1484            if (consumer.isDurableSubscriber()) {
1485                stats.onCreateDurableSubscriber();
1486            }
1487            this.connection.addDispatcher(consumer.getConsumerId(), this);
1488        }
1489    
1490        /**
1491         * Remove the message consumer.
1492         * 
1493         * @param consumer - consumer to be removed.
1494         * @throws JMSException
1495         */
1496        protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1497            this.connection.removeDispatcher(consumer.getConsumerId());
1498            if (consumer.isDurableSubscriber()) {
1499                stats.onRemoveDurableSubscriber();
1500            }
1501            this.consumers.remove(consumer);
1502            this.connection.removeDispatcher(consumer);
1503        }
1504    
1505        /**
1506         * Adds a message producer.
1507         * 
1508         * @param producer - message producer to be added.
1509         * @throws JMSException
1510         */
1511        protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1512            this.producers.add(producer);
1513            this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1514        }
1515    
1516        /**
1517         * Removes a message producer.
1518         * 
1519         * @param producer - message producer to be removed.
1520         * @throws JMSException
1521         */
1522        protected void removeProducer(ActiveMQMessageProducer producer) {
1523            this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1524            this.producers.remove(producer);
1525        }
1526    
1527        /**
1528         * Start this Session.
1529         * 
1530         * @throws JMSException
1531         */
1532        protected void start() throws JMSException {
1533            started.set(true);
1534            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1535                ActiveMQMessageConsumer c = iter.next();
1536                c.start();
1537            }
1538            executor.start();
1539        }
1540    
1541        /**
1542         * Stops this session.
1543         * 
1544         * @throws JMSException
1545         */
1546        protected void stop() throws JMSException {
1547    
1548            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1549                ActiveMQMessageConsumer c = iter.next();
1550                c.stop();
1551            }
1552    
1553            started.set(false);
1554            executor.stop();
1555        }
1556    
1557        /**
1558         * Returns the session id.
1559         * 
1560         * @return value - session id.
1561         */
1562        protected SessionId getSessionId() {
1563            return info.getSessionId();
1564        }
1565    
1566        /**
1567         * @return
1568         */
1569        protected ConsumerId getNextConsumerId() {
1570            return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1571        }
1572    
1573        /**
1574         * @return
1575         */
1576        protected ProducerId getNextProducerId() {
1577            return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1578        }
1579    
1580        /**
1581         * Sends the message for dispatch by the broker.
1582         * 
1583         * @param producer - message producer.
1584         * @param destination - message destination.
1585         * @param message - message to be sent.
1586         * @param deliveryMode - JMS messsage delivery mode.
1587         * @param priority - message priority.
1588         * @param timeToLive - message expiration.
1589         * @param producerWindow
1590         * @throws JMSException
1591         */
1592        protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1593                            MemoryUsage producerWindow, int sendTimeout) throws JMSException {
1594    
1595            checkClosed();
1596            if (destination.isTemporary() && connection.isDeleted(destination)) {
1597                throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1598            }
1599            synchronized (sendMutex) {
1600                // tell the Broker we are about to start a new transaction
1601                doStartTransaction();
1602                TransactionId txid = transactionContext.getTransactionId();
1603                long sequenceNumber = producer.getMessageSequence();
1604    
1605                // transform to our own message format here
1606                ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1607    
1608                // Set the message id.
1609                if (msg == message) {
1610                    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1611                } else {
1612                    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1613                    message.setJMSMessageID(msg.getMessageId().toString());
1614                }
1615                //clear the brokerPath in case we are re-sending this message
1616                msg.setBrokerPath(null);
1617    
1618                msg.setJMSDestination(destination);
1619                msg.setJMSDeliveryMode(deliveryMode);
1620                long expiration = 0L;
1621                if (!producer.getDisableMessageTimestamp()) {
1622                    long timeStamp = System.currentTimeMillis();
1623                    msg.setJMSTimestamp(timeStamp);
1624                    if (timeToLive > 0) {
1625                        expiration = timeToLive + timeStamp;
1626                    }
1627                }
1628                msg.setJMSExpiration(expiration);
1629                msg.setJMSPriority(priority);
1630                msg.setJMSRedelivered(false);
1631    
1632                msg.setTransactionId(txid);
1633                if (connection.isCopyMessageOnSend()) {
1634                    msg = (ActiveMQMessage)msg.copy();
1635                }
1636                msg.setConnection(connection);
1637                msg.onSend();
1638                msg.setProducerId(msg.getMessageId().getProducerId());
1639                if (this.debug) {
1640                    LOG.debug(getSessionId() + " sending message: " + msg);
1641                }
1642                if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1643                    this.connection.asyncSendPacket(msg);
1644                    if (producerWindow != null) {
1645                        // Since we defer lots of the marshaling till we hit the
1646                        // wire, this might not
1647                        // provide and accurate size. We may change over to doing
1648                        // more aggressive marshaling,
1649                        // to get more accurate sizes.. this is more important once
1650                        // users start using producer window
1651                        // flow control.
1652                        int size = msg.getSize();
1653                        producerWindow.increaseUsage(size);
1654                    }
1655                } else {
1656                    if (sendTimeout > 0) {
1657                        this.connection.syncSendPacket(msg,sendTimeout);
1658                    }else {
1659                        this.connection.syncSendPacket(msg);
1660                    }
1661                }
1662    
1663            }
1664        }
1665    
1666        /**
1667         * Send TransactionInfo to indicate transaction has started
1668         * 
1669         * @throws JMSException if some internal error occurs
1670         */
1671        protected void doStartTransaction() throws JMSException {
1672            if (getTransacted() && !transactionContext.isInXATransaction()) {
1673                transactionContext.begin();
1674            }
1675        }
1676    
1677        /**
1678         * Checks whether the session has unconsumed messages.
1679         * 
1680         * @return true - if there are unconsumed messages.
1681         */
1682        public boolean hasUncomsumedMessages() {
1683            return executor.hasUncomsumedMessages();
1684        }
1685    
1686        /**
1687         * Checks whether the session uses transactions.
1688         * 
1689         * @return true - if the session uses transactions.
1690         */
1691        public boolean isTransacted() {
1692            return this.acknowledgementMode == Session.SESSION_TRANSACTED;
1693        }
1694    
1695        /**
1696         * Checks whether the session used client acknowledgment.
1697         * 
1698         * @return true - if the session uses client acknowledgment.
1699         */
1700        protected boolean isClientAcknowledge() {
1701            return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1702        }
1703    
1704        /**
1705         * Checks whether the session used auto acknowledgment.
1706         * 
1707         * @return true - if the session uses client acknowledgment.
1708         */
1709        public boolean isAutoAcknowledge() {
1710            return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1711        }
1712    
1713        /**
1714         * Checks whether the session used dup ok acknowledgment.
1715         * 
1716         * @return true - if the session uses client acknowledgment.
1717         */
1718        public boolean isDupsOkAcknowledge() {
1719            return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1720        }
1721        
1722        public boolean isIndividualAcknowledge(){
1723            return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1724        }
1725    
1726        /**
1727         * Returns the message delivery listener.
1728         * 
1729         * @return deliveryListener - message delivery listener.
1730         */
1731        public DeliveryListener getDeliveryListener() {
1732            return deliveryListener;
1733        }
1734    
1735        /**
1736         * Sets the message delivery listener.
1737         * 
1738         * @param deliveryListener - message delivery listener.
1739         */
1740        public void setDeliveryListener(DeliveryListener deliveryListener) {
1741            this.deliveryListener = deliveryListener;
1742        }
1743    
1744        /**
1745         * Returns the SessionInfo bean.
1746         * 
1747         * @return info - SessionInfo bean.
1748         * @throws JMSException
1749         */
1750        protected SessionInfo getSessionInfo() throws JMSException {
1751            SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1752            return info;
1753        }
1754    
1755        /**
1756         * Send the asynchronus command.
1757         * 
1758         * @param command - command to be executed.
1759         * @throws JMSException
1760         */
1761        public void asyncSendPacket(Command command) throws JMSException {
1762            connection.asyncSendPacket(command);
1763        }
1764    
1765        /**
1766         * Send the synchronus command.
1767         * 
1768         * @param command - command to be executed.
1769         * @return Response
1770         * @throws JMSException
1771         */
1772        public Response syncSendPacket(Command command) throws JMSException {
1773            return connection.syncSendPacket(command);
1774        }
1775    
1776        public long getNextDeliveryId() {
1777            return deliveryIdGenerator.getNextSequenceId();
1778        }
1779    
1780        public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1781    
1782            List<MessageDispatch> c = unconsumedMessages.removeAll();
1783            for (MessageDispatch md : c) {
1784                this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1785            }
1786            Collections.reverse(c);
1787    
1788            for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1789                MessageDispatch md = iter.next();
1790                executor.executeFirst(md);
1791            }
1792    
1793        }
1794    
1795        public boolean isRunning() {
1796            return started.get();
1797        }
1798    
1799        public boolean isAsyncDispatch() {
1800            return asyncDispatch;
1801        }
1802    
1803        public void setAsyncDispatch(boolean asyncDispatch) {
1804            this.asyncDispatch = asyncDispatch;
1805        }
1806    
1807        /**
1808         * @return Returns the sessionAsyncDispatch.
1809         */
1810        public boolean isSessionAsyncDispatch() {
1811            return sessionAsyncDispatch;
1812        }
1813    
1814        /**
1815         * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1816         */
1817        public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1818            this.sessionAsyncDispatch = sessionAsyncDispatch;
1819        }
1820    
1821        public MessageTransformer getTransformer() {
1822            return transformer;
1823        }
1824    
1825        public ActiveMQConnection getConnection() {
1826            return connection;
1827        }
1828    
1829        /**
1830         * Sets the transformer used to transform messages before they are sent on
1831         * to the JMS bus or when they are received from the bus but before they are
1832         * delivered to the JMS client
1833         */
1834        public void setTransformer(MessageTransformer transformer) {
1835            this.transformer = transformer;
1836        }
1837    
1838        public BlobTransferPolicy getBlobTransferPolicy() {
1839            return blobTransferPolicy;
1840        }
1841    
1842        /**
1843         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1844         * OBjects) are transferred from producers to brokers to consumers
1845         */
1846        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1847            this.blobTransferPolicy = blobTransferPolicy;
1848        }
1849    
1850        public List getUnconsumedMessages() {
1851            return executor.getUnconsumedMessages();
1852        }
1853    
1854        public String toString() {
1855            return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1856        }
1857    
1858        public void checkMessageListener() throws JMSException {
1859            if (messageListener != null) {
1860                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1861            }
1862            for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
1863                ActiveMQMessageConsumer consumer = i.next();
1864                if (consumer.getMessageListener() != null) {
1865                    throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1866                }
1867            }
1868        }
1869    
1870        protected void setOptimizeAcknowledge(boolean value) {
1871            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1872                ActiveMQMessageConsumer c = iter.next();
1873                c.setOptimizeAcknowledge(value);
1874            }
1875        }
1876    
1877        protected void setPrefetchSize(ConsumerId id, int prefetch) {
1878            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1879                ActiveMQMessageConsumer c = iter.next();
1880                if (c.getConsumerId().equals(id)) {
1881                    c.setPrefetchSize(prefetch);
1882                    break;
1883                }
1884            }
1885        }
1886    
1887        protected void close(ConsumerId id) {
1888            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1889                ActiveMQMessageConsumer c = iter.next();
1890                if (c.getConsumerId().equals(id)) {
1891                    try {
1892                        c.close();
1893                    } catch (JMSException e) {
1894                        LOG.warn("Exception closing consumer", e);
1895                    }
1896                    LOG.warn("Closed consumer on Command");
1897                    break;
1898                }
1899            }
1900        }
1901    
1902        public boolean isInUse(ActiveMQTempDestination destination) {
1903            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1904                ActiveMQMessageConsumer c = iter.next();
1905                if (c.isInUse(destination)) {
1906                    return true;
1907                }
1908            }
1909            return false;
1910        }
1911        
1912        protected void sendAck(MessageAck ack) throws JMSException {
1913            sendAck(ack,false);
1914        }
1915        
1916        protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
1917            if (lazy || connection.isSendAcksAsync() || isTransacted()) {
1918                asyncSendPacket(ack);
1919            } else {
1920                syncSendPacket(ack);
1921            }
1922        }
1923    
1924    }