001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.File;
020    import java.io.InputStream;
021    import java.io.Serializable;
022    import java.net.URL;
023    import java.util.Collections;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.concurrent.CopyOnWriteArrayList;
027    import java.util.concurrent.ThreadPoolExecutor;
028    import java.util.concurrent.atomic.AtomicBoolean;
029    
030    import javax.jms.BytesMessage;
031    import javax.jms.Destination;
032    import javax.jms.IllegalStateException;
033    import javax.jms.InvalidDestinationException;
034    import javax.jms.InvalidSelectorException;
035    import javax.jms.JMSException;
036    import javax.jms.MapMessage;
037    import javax.jms.Message;
038    import javax.jms.MessageConsumer;
039    import javax.jms.MessageListener;
040    import javax.jms.MessageProducer;
041    import javax.jms.ObjectMessage;
042    import javax.jms.Queue;
043    import javax.jms.QueueBrowser;
044    import javax.jms.QueueReceiver;
045    import javax.jms.QueueSender;
046    import javax.jms.QueueSession;
047    import javax.jms.Session;
048    import javax.jms.StreamMessage;
049    import javax.jms.TemporaryQueue;
050    import javax.jms.TemporaryTopic;
051    import javax.jms.TextMessage;
052    import javax.jms.Topic;
053    import javax.jms.TopicPublisher;
054    import javax.jms.TopicSession;
055    import javax.jms.TopicSubscriber;
056    import javax.jms.TransactionRolledBackException;
057    
058    import org.apache.activemq.blob.BlobDownloader;
059    import org.apache.activemq.blob.BlobTransferPolicy;
060    import org.apache.activemq.blob.BlobUploader;
061    import org.apache.activemq.command.ActiveMQBlobMessage;
062    import org.apache.activemq.command.ActiveMQBytesMessage;
063    import org.apache.activemq.command.ActiveMQDestination;
064    import org.apache.activemq.command.ActiveMQMapMessage;
065    import org.apache.activemq.command.ActiveMQMessage;
066    import org.apache.activemq.command.ActiveMQObjectMessage;
067    import org.apache.activemq.command.ActiveMQQueue;
068    import org.apache.activemq.command.ActiveMQStreamMessage;
069    import org.apache.activemq.command.ActiveMQTempDestination;
070    import org.apache.activemq.command.ActiveMQTempQueue;
071    import org.apache.activemq.command.ActiveMQTempTopic;
072    import org.apache.activemq.command.ActiveMQTextMessage;
073    import org.apache.activemq.command.ActiveMQTopic;
074    import org.apache.activemq.command.Command;
075    import org.apache.activemq.command.ConsumerId;
076    import org.apache.activemq.command.MessageAck;
077    import org.apache.activemq.command.MessageDispatch;
078    import org.apache.activemq.command.MessageId;
079    import org.apache.activemq.command.ProducerId;
080    import org.apache.activemq.command.RemoveInfo;
081    import org.apache.activemq.command.Response;
082    import org.apache.activemq.command.SessionId;
083    import org.apache.activemq.command.SessionInfo;
084    import org.apache.activemq.command.TransactionId;
085    import org.apache.activemq.management.JMSSessionStatsImpl;
086    import org.apache.activemq.management.StatsCapable;
087    import org.apache.activemq.management.StatsImpl;
088    import org.apache.activemq.thread.Scheduler;
089    import org.apache.activemq.transaction.Synchronization;
090    import org.apache.activemq.usage.MemoryUsage;
091    import org.apache.activemq.util.Callback;
092    import org.apache.activemq.util.JMSExceptionSupport;
093    import org.apache.activemq.util.LongSequenceGenerator;
094    import org.slf4j.Logger;
095    import org.slf4j.LoggerFactory;
096    
097    /**
098     * <P>
099     * A <CODE>Session</CODE> object is a single-threaded context for producing
100     * and consuming messages. Although it may allocate provider resources outside
101     * the Java virtual machine (JVM), it is considered a lightweight JMS object.
102     * <P>
103     * A session serves several purposes:
104     * <UL>
105     * <LI>It is a factory for its message producers and consumers.
106     * <LI>It supplies provider-optimized message factories.
107     * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
108     * <CODE>TemporaryQueues</CODE>.
109     * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
110     * objects for those clients that need to dynamically manipulate
111     * provider-specific destination names.
112     * <LI>It supports a single series of transactions that combine work spanning
113     * its producers and consumers into atomic units.
114     * <LI>It defines a serial order for the messages it consumes and the messages
115     * it produces.
116     * <LI>It retains messages it consumes until they have been acknowledged.
117     * <LI>It serializes execution of message listeners registered with its message
118     * consumers.
119     * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
120     * </UL>
121     * <P>
122     * A session can create and service multiple message producers and consumers.
123     * <P>
124     * One typical use is to have a thread block on a synchronous
125     * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
126     * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
127     * <P>
128     * If a client desires to have one thread produce messages while others consume
129     * them, the client should use a separate session for its producing thread.
130     * <P>
131     * Once a connection has been started, any session with one or more registered
132     * message listeners is dedicated to the thread of control that delivers
133     * messages to it. It is erroneous for client code to use this session or any of
134     * its constituent objects from another thread of control. The only exception to
135     * this rule is the use of the session or connection <CODE>close</CODE>
136     * method.
137     * <P>
138     * It should be easy for most clients to partition their work naturally into
139     * sessions. This model allows clients to start simply and incrementally add
140     * message processing complexity as their need for concurrency grows.
141     * <P>
142     * The <CODE>close</CODE> method is the only session method that can be called
143     * while some other session method is being executed in another thread.
144     * <P>
145     * A session may be specified as transacted. Each transacted session supports a
146     * single series of transactions. Each transaction groups a set of message sends
147     * and a set of message receives into an atomic unit of work. In effect,
148     * transactions organize a session's input message stream and output message
149     * stream into series of atomic units. When a transaction commits, its atomic
150     * unit of input is acknowledged and its associated atomic unit of output is
151     * sent. If a transaction rollback is done, the transaction's sent messages are
152     * destroyed and the session's input is automatically recovered.
153     * <P>
154     * The content of a transaction's input and output units is simply those
155     * messages that have been produced and consumed within the session's current
156     * transaction.
157     * <P>
158     * A transaction is completed using either its session's <CODE>commit</CODE>
159     * method or its session's <CODE>rollback </CODE> method. The completion of a
160     * session's current transaction automatically begins the next. The result is
161     * that a transacted session always has a current transaction within which its
162     * work is done.
163     * <P>
164     * The Java Transaction Service (JTS) or some other transaction monitor may be
165     * used to combine a session's transaction with transactions on other resources
166     * (databases, other JMS sessions, etc.). Since Java distributed transactions
167     * are controlled via the Java Transaction API (JTA), use of the session's
168     * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
169     * prohibited.
170     * <P>
171     * The JMS API does not require support for JTA; however, it does define how a
172     * provider supplies this support.
173     * <P>
174     * Although it is also possible for a JMS client to handle distributed
175     * transactions directly, it is unlikely that many JMS clients will do this.
176     * Support for JTA in the JMS API is targeted at systems vendors who will be
177     * integrating the JMS API into their application server products.
178     *
179     *
180     * @see javax.jms.Session
181     * @see javax.jms.QueueSession
182     * @see javax.jms.TopicSession
183     * @see javax.jms.XASession
184     */
185    public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
186    
187        /**
188         * Only acknowledge an individual message - using message.acknowledge()
189         * as opposed to CLIENT_ACKNOWLEDGE which
190         * acknowledges all messages consumed by a session at when acknowledge()
191         * is called
192         */
193        public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
194        public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
195    
196        public static interface DeliveryListener {
197            void beforeDelivery(ActiveMQSession session, Message msg);
198    
199            void afterDelivery(ActiveMQSession session, Message msg);
200        }
201    
202        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
203        private final ThreadPoolExecutor connectionExecutor;
204    
205        protected int acknowledgementMode;
206        protected final ActiveMQConnection connection;
207        protected final SessionInfo info;
208        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
209        protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
210        protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
211        protected final ActiveMQSessionExecutor executor;
212        protected final AtomicBoolean started = new AtomicBoolean(false);
213    
214        protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
215        protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
216    
217        protected boolean closed;
218        private volatile boolean synchronizationRegistered;
219        protected boolean asyncDispatch;
220        protected boolean sessionAsyncDispatch;
221        protected final boolean debug;
222        protected Object sendMutex = new Object();
223    
224        private MessageListener messageListener;
225        private final JMSSessionStatsImpl stats;
226        private TransactionContext transactionContext;
227        private DeliveryListener deliveryListener;
228        private MessageTransformer transformer;
229        private BlobTransferPolicy blobTransferPolicy;
230        private long lastDeliveredSequenceId;
231    
232        /**
233         * Construct the Session
234         *
235         * @param connection
236         * @param sessionId
237         * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
238         *                Session.SESSION_TRANSACTED
239         * @param asyncDispatch
240         * @param sessionAsyncDispatch
241         * @throws JMSException on internal error
242         */
243        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
244            this.debug = LOG.isDebugEnabled();
245            this.connection = connection;
246            this.acknowledgementMode = acknowledgeMode;
247            this.asyncDispatch = asyncDispatch;
248            this.sessionAsyncDispatch = sessionAsyncDispatch;
249            this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
250            setTransactionContext(new TransactionContext(connection));
251            stats = new JMSSessionStatsImpl(producers, consumers);
252            this.connection.asyncSendPacket(info);
253            setTransformer(connection.getTransformer());
254            setBlobTransferPolicy(connection.getBlobTransferPolicy());
255            this.connectionExecutor=connection.getExecutor();
256            this.executor = new ActiveMQSessionExecutor(this);
257            connection.addSession(this);
258            if (connection.isStarted()) {
259                start();
260            }
261    
262        }
263    
264        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
265            this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
266        }
267    
268        /**
269         * Sets the transaction context of the session.
270         *
271         * @param transactionContext - provides the means to control a JMS
272         *                transaction.
273         */
274        public void setTransactionContext(TransactionContext transactionContext) {
275            this.transactionContext = transactionContext;
276        }
277    
278        /**
279         * Returns the transaction context of the session.
280         *
281         * @return transactionContext - session's transaction context.
282         */
283        public TransactionContext getTransactionContext() {
284            return transactionContext;
285        }
286    
287        /*
288         * (non-Javadoc)
289         *
290         * @see org.apache.activemq.management.StatsCapable#getStats()
291         */
292        public StatsImpl getStats() {
293            return stats;
294        }
295    
296        /**
297         * Returns the session's statistics.
298         *
299         * @return stats - session's statistics.
300         */
301        public JMSSessionStatsImpl getSessionStats() {
302            return stats;
303        }
304    
305        /**
306         * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
307         * object is used to send a message containing a stream of uninterpreted
308         * bytes.
309         *
310         * @return the an ActiveMQBytesMessage
311         * @throws JMSException if the JMS provider fails to create this message due
312         *                 to some internal error.
313         */
314        public BytesMessage createBytesMessage() throws JMSException {
315            ActiveMQBytesMessage message = new ActiveMQBytesMessage();
316            configureMessage(message);
317            return message;
318        }
319    
320        /**
321         * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
322         * object is used to send a self-defining set of name-value pairs, where
323         * names are <CODE>String</CODE> objects and values are primitive values
324         * in the Java programming language.
325         *
326         * @return an ActiveMQMapMessage
327         * @throws JMSException if the JMS provider fails to create this message due
328         *                 to some internal error.
329         */
330        public MapMessage createMapMessage() throws JMSException {
331            ActiveMQMapMessage message = new ActiveMQMapMessage();
332            configureMessage(message);
333            return message;
334        }
335    
336        /**
337         * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
338         * interface is the root interface of all JMS messages. A
339         * <CODE>Message</CODE> object holds all the standard message header
340         * information. It can be sent when a message containing only header
341         * information is sufficient.
342         *
343         * @return an ActiveMQMessage
344         * @throws JMSException if the JMS provider fails to create this message due
345         *                 to some internal error.
346         */
347        public Message createMessage() throws JMSException {
348            ActiveMQMessage message = new ActiveMQMessage();
349            configureMessage(message);
350            return message;
351        }
352    
353        /**
354         * Creates an <CODE>ObjectMessage</CODE> object. An
355         * <CODE>ObjectMessage</CODE> object is used to send a message that
356         * contains a serializable Java object.
357         *
358         * @return an ActiveMQObjectMessage
359         * @throws JMSException if the JMS provider fails to create this message due
360         *                 to some internal error.
361         */
362        public ObjectMessage createObjectMessage() throws JMSException {
363            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
364            configureMessage(message);
365            return message;
366        }
367    
368        /**
369         * Creates an initialized <CODE>ObjectMessage</CODE> object. An
370         * <CODE>ObjectMessage</CODE> object is used to send a message that
371         * contains a serializable Java object.
372         *
373         * @param object the object to use to initialize this message
374         * @return an ActiveMQObjectMessage
375         * @throws JMSException if the JMS provider fails to create this message due
376         *                 to some internal error.
377         */
378        public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
379            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
380            configureMessage(message);
381            message.setObject(object);
382            return message;
383        }
384    
385        /**
386         * Creates a <CODE>StreamMessage</CODE> object. A
387         * <CODE>StreamMessage</CODE> object is used to send a self-defining
388         * stream of primitive values in the Java programming language.
389         *
390         * @return an ActiveMQStreamMessage
391         * @throws JMSException if the JMS provider fails to create this message due
392         *                 to some internal error.
393         */
394        public StreamMessage createStreamMessage() throws JMSException {
395            ActiveMQStreamMessage message = new ActiveMQStreamMessage();
396            configureMessage(message);
397            return message;
398        }
399    
400        /**
401         * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
402         * object is used to send a message containing a <CODE>String</CODE>
403         * object.
404         *
405         * @return an ActiveMQTextMessage
406         * @throws JMSException if the JMS provider fails to create this message due
407         *                 to some internal error.
408         */
409        public TextMessage createTextMessage() throws JMSException {
410            ActiveMQTextMessage message = new ActiveMQTextMessage();
411            configureMessage(message);
412            return message;
413        }
414    
415        /**
416         * Creates an initialized <CODE>TextMessage</CODE> object. A
417         * <CODE>TextMessage</CODE> object is used to send a message containing a
418         * <CODE>String</CODE>.
419         *
420         * @param text the string used to initialize this message
421         * @return an ActiveMQTextMessage
422         * @throws JMSException if the JMS provider fails to create this message due
423         *                 to some internal error.
424         */
425        public TextMessage createTextMessage(String text) throws JMSException {
426            ActiveMQTextMessage message = new ActiveMQTextMessage();
427            message.setText(text);
428            configureMessage(message);
429            return message;
430        }
431    
432        /**
433         * Creates an initialized <CODE>BlobMessage</CODE> object. A
434         * <CODE>BlobMessage</CODE> object is used to send a message containing a
435         * <CODE>URL</CODE> which points to some network addressible BLOB.
436         *
437         * @param url the network addressable URL used to pass directly to the
438         *                consumer
439         * @return a BlobMessage
440         * @throws JMSException if the JMS provider fails to create this message due
441         *                 to some internal error.
442         */
443        public BlobMessage createBlobMessage(URL url) throws JMSException {
444            return createBlobMessage(url, false);
445        }
446    
447        /**
448         * Creates an initialized <CODE>BlobMessage</CODE> object. A
449         * <CODE>BlobMessage</CODE> object is used to send a message containing a
450         * <CODE>URL</CODE> which points to some network addressible BLOB.
451         *
452         * @param url the network addressable URL used to pass directly to the
453         *                consumer
454         * @param deletedByBroker indicates whether or not the resource is deleted
455         *                by the broker when the message is acknowledged
456         * @return a BlobMessage
457         * @throws JMSException if the JMS provider fails to create this message due
458         *                 to some internal error.
459         */
460        public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
461            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
462            configureMessage(message);
463            message.setURL(url);
464            message.setDeletedByBroker(deletedByBroker);
465            message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
466            return message;
467        }
468    
469        /**
470         * Creates an initialized <CODE>BlobMessage</CODE> object. A
471         * <CODE>BlobMessage</CODE> object is used to send a message containing
472         * the <CODE>File</CODE> content. Before the message is sent the file
473         * conent will be uploaded to the broker or some other remote repository
474         * depending on the {@link #getBlobTransferPolicy()}.
475         *
476         * @param file the file to be uploaded to some remote repo (or the broker)
477         *                depending on the strategy
478         * @return a BlobMessage
479         * @throws JMSException if the JMS provider fails to create this message due
480         *                 to some internal error.
481         */
482        public BlobMessage createBlobMessage(File file) throws JMSException {
483            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
484            configureMessage(message);
485            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
486            message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
487            message.setDeletedByBroker(true);
488            message.setName(file.getName());
489            return message;
490        }
491    
492        /**
493         * Creates an initialized <CODE>BlobMessage</CODE> object. A
494         * <CODE>BlobMessage</CODE> object is used to send a message containing
495         * the <CODE>File</CODE> content. Before the message is sent the file
496         * conent will be uploaded to the broker or some other remote repository
497         * depending on the {@link #getBlobTransferPolicy()}.
498         *
499         * @param in the stream to be uploaded to some remote repo (or the broker)
500         *                depending on the strategy
501         * @return a BlobMessage
502         * @throws JMSException if the JMS provider fails to create this message due
503         *                 to some internal error.
504         */
505        public BlobMessage createBlobMessage(InputStream in) throws JMSException {
506            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
507            configureMessage(message);
508            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
509            message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
510            message.setDeletedByBroker(true);
511            return message;
512        }
513    
514        /**
515         * Indicates whether the session is in transacted mode.
516         *
517         * @return true if the session is in transacted mode
518         * @throws JMSException if there is some internal error.
519         */
520        public boolean getTransacted() throws JMSException {
521            checkClosed();
522            return isTransacted();
523        }
524    
525        /**
526         * Returns the acknowledgement mode of the session. The acknowledgement mode
527         * is set at the time that the session is created. If the session is
528         * transacted, the acknowledgement mode is ignored.
529         *
530         * @return If the session is not transacted, returns the current
531         *         acknowledgement mode for the session. If the session is
532         *         transacted, returns SESSION_TRANSACTED.
533         * @throws JMSException
534         * @see javax.jms.Connection#createSession(boolean,int)
535         * @since 1.1 exception JMSException if there is some internal error.
536         */
537        public int getAcknowledgeMode() throws JMSException {
538            checkClosed();
539            return this.acknowledgementMode;
540        }
541    
542        /**
543         * Commits all messages done in this transaction and releases any locks
544         * currently held.
545         *
546         * @throws JMSException if the JMS provider fails to commit the transaction
547         *                 due to some internal error.
548         * @throws TransactionRolledBackException if the transaction is rolled back
549         *                 due to some internal error during commit.
550         * @throws javax.jms.IllegalStateException if the method is not called by a
551         *                 transacted session.
552         */
553        public void commit() throws JMSException {
554            checkClosed();
555            if (!getTransacted()) {
556                throw new javax.jms.IllegalStateException("Not a transacted session");
557            }
558            if (LOG.isDebugEnabled()) {
559                LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
560            }
561            transactionContext.commit();
562        }
563    
564        /**
565         * Rolls back any messages done in this transaction and releases any locks
566         * currently held.
567         *
568         * @throws JMSException if the JMS provider fails to roll back the
569         *                 transaction due to some internal error.
570         * @throws javax.jms.IllegalStateException if the method is not called by a
571         *                 transacted session.
572         */
573        public void rollback() throws JMSException {
574            checkClosed();
575            if (!getTransacted()) {
576                throw new javax.jms.IllegalStateException("Not a transacted session");
577            }
578            if (LOG.isDebugEnabled()) {
579                LOG.debug(getSessionId() + " Transaction Rollback, txid:"  + transactionContext.getTransactionId());
580            }
581            transactionContext.rollback();
582        }
583    
584        /**
585         * Closes the session.
586         * <P>
587         * Since a provider may allocate some resources on behalf of a session
588         * outside the JVM, clients should close the resources when they are not
589         * needed. Relying on garbage collection to eventually reclaim these
590         * resources may not be timely enough.
591         * <P>
592         * There is no need to close the producers and consumers of a closed
593         * session.
594         * <P>
595         * This call will block until a <CODE>receive</CODE> call or message
596         * listener in progress has completed. A blocked message consumer
597         * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
598         * is closed.
599         * <P>
600         * Closing a transacted session must roll back the transaction in progress.
601         * <P>
602         * This method is the only <CODE>Session</CODE> method that can be called
603         * concurrently.
604         * <P>
605         * Invoking any other <CODE>Session</CODE> method on a closed session must
606         * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
607         * closed session must <I>not </I> throw an exception.
608         *
609         * @throws JMSException if the JMS provider fails to close the session due
610         *                 to some internal error.
611         */
612        public void close() throws JMSException {
613            if (!closed) {
614                if (getTransactionContext().isInXATransaction()) {
615                    if (!synchronizationRegistered) {
616                        synchronizationRegistered = true;
617                        getTransactionContext().addSynchronization(new Synchronization() {
618    
619                                            @Override
620                                            public void afterCommit() throws Exception {
621                                                doClose();
622                                                synchronizationRegistered = false;
623                                            }
624    
625                                            @Override
626                                            public void afterRollback() throws Exception {
627                                                doClose();
628                                                synchronizationRegistered = false;
629                                            }
630                                        });
631                    }
632    
633                } else {
634                    doClose();
635                }
636            }
637        }
638    
639        private void doClose() throws JMSException {
640            boolean interrupted = Thread.interrupted();
641            dispose();
642            RemoveInfo removeCommand = info.createRemoveCommand();
643            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
644            connection.asyncSendPacket(removeCommand);
645            if (interrupted) {
646                Thread.currentThread().interrupt();
647            }
648        }
649    
650        void clearMessagesInProgress() {
651            executor.clearMessagesInProgress();
652            // we are called from inside the transport reconnection logic
653            // which involves us clearing all the connections' consumers
654            // dispatch and delivered lists. So rather than trying to
655            // grab a mutex (which could be already owned by the message
656            // listener calling the send or an ack) we allow it to complete in
657            // a separate thread via the scheduler and notify us via
658            // connection.transportInterruptionProcessingComplete()
659            //
660            for (final ActiveMQMessageConsumer consumer : consumers) {
661                consumer.inProgressClearRequired();
662                try {
663                    connection.getScheduler().executeAfterDelay(new Runnable() {
664                        public void run() {
665                            consumer.clearMessagesInProgress();
666                        }}, 0l);
667                } catch (JMSException e) {
668                    connection.onClientInternalException(e);
669                }
670            }
671        }
672    
673        void deliverAcks() {
674            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
675                ActiveMQMessageConsumer consumer = iter.next();
676                consumer.deliverAcks();
677            }
678        }
679    
680        public synchronized void dispose() throws JMSException {
681            if (!closed) {
682    
683                try {
684                    executor.stop();
685    
686                    for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
687                        ActiveMQMessageConsumer consumer = iter.next();
688                        consumer.setFailureError(connection.getFirstFailureError());
689                        consumer.dispose();
690                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
691                    }
692                    consumers.clear();
693    
694                    for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
695                        ActiveMQMessageProducer producer = iter.next();
696                        producer.dispose();
697                    }
698                    producers.clear();
699    
700                    try {
701                        if (getTransactionContext().isInLocalTransaction()) {
702                            rollback();
703                        }
704                    } catch (JMSException e) {
705                    }
706    
707                } finally {
708                    connection.removeSession(this);
709                    this.transactionContext = null;
710                    closed = true;
711                }
712            }
713        }
714    
715        /**
716         * Checks that the session is not closed then configures the message
717         */
718        protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
719            checkClosed();
720            message.setConnection(connection);
721        }
722    
723        /**
724         * Check if the session is closed. It is used for ensuring that the session
725         * is open before performing various operations.
726         *
727         * @throws IllegalStateException if the Session is closed
728         */
729        protected void checkClosed() throws IllegalStateException {
730            if (closed) {
731                throw new IllegalStateException("The Session is closed");
732            }
733        }
734    
735        /**
736         * Checks if the session is closed.
737         *
738         * @return true if the session is closed, false otherwise.
739         */
740        public boolean isClosed() {
741            return closed;
742        }
743    
744        /**
745         * Stops message delivery in this session, and restarts message delivery
746         * with the oldest unacknowledged message.
747         * <P>
748         * All consumers deliver messages in a serial order. Acknowledging a
749         * received message automatically acknowledges all messages that have been
750         * delivered to the client.
751         * <P>
752         * Restarting a session causes it to take the following actions:
753         * <UL>
754         * <LI>Stop message delivery
755         * <LI>Mark all messages that might have been delivered but not
756         * acknowledged as "redelivered"
757         * <LI>Restart the delivery sequence including all unacknowledged messages
758         * that had been previously delivered. Redelivered messages do not have to
759         * be delivered in exactly their original delivery order.
760         * </UL>
761         *
762         * @throws JMSException if the JMS provider fails to stop and restart
763         *                 message delivery due to some internal error.
764         * @throws IllegalStateException if the method is called by a transacted
765         *                 session.
766         */
767        public void recover() throws JMSException {
768    
769            checkClosed();
770            if (getTransacted()) {
771                throw new IllegalStateException("This session is transacted");
772            }
773    
774            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
775                ActiveMQMessageConsumer c = iter.next();
776                c.rollback();
777            }
778    
779        }
780    
781        /**
782         * Returns the session's distinguished message listener (optional).
783         *
784         * @return the message listener associated with this session
785         * @throws JMSException if the JMS provider fails to get the message
786         *                 listener due to an internal error.
787         * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
788         * @see javax.jms.ServerSessionPool
789         * @see javax.jms.ServerSession
790         */
791        public MessageListener getMessageListener() throws JMSException {
792            checkClosed();
793            return this.messageListener;
794        }
795    
796        /**
797         * Sets the session's distinguished message listener (optional).
798         * <P>
799         * When the distinguished message listener is set, no other form of message
800         * receipt in the session can be used; however, all forms of sending
801         * messages are still supported.
802         * <P>
803         * If this session has been closed, then an {@link IllegalStateException} is
804         * thrown, if trying to set a new listener. However setting the listener
805         * to <tt>null</tt> is allowed, to clear the listener, even if this session
806         * has been closed prior.
807         * <P>
808         * This is an expert facility not used by regular JMS clients.
809         *
810         * @param listener the message listener to associate with this session
811         * @throws JMSException if the JMS provider fails to set the message
812         *                 listener due to an internal error.
813         * @see javax.jms.Session#getMessageListener()
814         * @see javax.jms.ServerSessionPool
815         * @see javax.jms.ServerSession
816         */
817        public void setMessageListener(MessageListener listener) throws JMSException {
818            // only check for closed if we set a new listener, as we allow to clear
819            // the listener, such as when an application is shutting down, and is
820            // no longer using a message listener on this session
821            if (listener != null) {
822                checkClosed();
823            }
824            this.messageListener = listener;
825    
826            if (listener != null) {
827                executor.setDispatchedBySessionPool(true);
828            }
829        }
830    
831        /**
832         * Optional operation, intended to be used only by Application Servers, not
833         * by ordinary JMS clients.
834         *
835         * @see javax.jms.ServerSession
836         */
837        public void run() {
838            MessageDispatch messageDispatch;
839            while ((messageDispatch = executor.dequeueNoWait()) != null) {
840                final MessageDispatch md = messageDispatch;
841                ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
842                if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
843                    // TODO: Ack it without delivery to client
844                    continue;
845                }
846    
847                if (isClientAcknowledge()||isIndividualAcknowledge()) {
848                    message.setAcknowledgeCallback(new Callback() {
849                        public void execute() throws Exception {
850                        }
851                    });
852                }
853    
854                if (deliveryListener != null) {
855                    deliveryListener.beforeDelivery(this, message);
856                }
857    
858                md.setDeliverySequenceId(getNextDeliveryId());
859    
860                try {
861                    messageListener.onMessage(message);
862                } catch (RuntimeException e) {
863                    LOG.error("error dispatching message: ", e);
864                    // A problem while invoking the MessageListener does not
865                    // in general indicate a problem with the connection to the broker, i.e.
866                    // it will usually be sufficient to let the afterDelivery() method either
867                    // commit or roll back in order to deal with the exception.
868                    // However, we notify any registered client internal exception listener
869                    // of the problem.
870                    connection.onClientInternalException(e);
871                }
872    
873                try {
874                    MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
875                    ack.setFirstMessageId(md.getMessage().getMessageId());
876                    doStartTransaction();
877                    ack.setTransactionId(getTransactionContext().getTransactionId());
878                    if (ack.getTransactionId() != null) {
879                        getTransactionContext().addSynchronization(new Synchronization() {
880    
881                            @Override
882                            public void afterRollback() throws Exception {
883                                md.getMessage().onMessageRolledBack();
884                                // ensure we don't filter this as a duplicate
885                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
886                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
887                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
888                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
889                                    && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
890                                    // We need to NACK the messages so that they get
891                                    // sent to the
892                                    // DLQ.
893                                    // Acknowledge the last message.
894                                    MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
895                                    ack.setFirstMessageId(md.getMessage().getMessageId());
896                                    asyncSendPacket(ack);
897                                } else {
898    
899                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
900                                    ack.setFirstMessageId(md.getMessage().getMessageId());
901                                    asyncSendPacket(ack);
902    
903                                    // Figure out how long we should wait to resend
904                                    // this message.
905                                    long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
906                                    for (int i = 0; i < redeliveryCounter; i++) {
907                                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
908                                    }
909                                    connection.getScheduler().executeAfterDelay(new Runnable() {
910    
911                                        public void run() {
912                                            ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
913                                        }
914                                    }, redeliveryDelay);
915                                }
916                            }
917                        });
918                    }
919                    asyncSendPacket(ack);
920                } catch (Throwable e) {
921                    connection.onClientInternalException(e);
922                }
923    
924                if (deliveryListener != null) {
925                    deliveryListener.afterDelivery(this, message);
926                }
927            }
928        }
929    
930        /**
931         * Creates a <CODE>MessageProducer</CODE> to send messages to the
932         * specified destination.
933         * <P>
934         * A client uses a <CODE>MessageProducer</CODE> object to send messages to
935         * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
936         * inherit from <CODE>Destination</CODE>, they can be used in the
937         * destination parameter to create a <CODE>MessageProducer</CODE> object.
938         *
939         * @param destination the <CODE>Destination</CODE> to send to, or null if
940         *                this is a producer which does not have a specified
941         *                destination.
942         * @return the MessageProducer
943         * @throws JMSException if the session fails to create a MessageProducer due
944         *                 to some internal error.
945         * @throws InvalidDestinationException if an invalid destination is
946         *                 specified.
947         * @since 1.1
948         */
949        public MessageProducer createProducer(Destination destination) throws JMSException {
950            checkClosed();
951            if (destination instanceof CustomDestination) {
952                CustomDestination customDestination = (CustomDestination)destination;
953                return customDestination.createProducer(this);
954            }
955            int timeSendOut = connection.getSendTimeout();
956            return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
957        }
958    
959        /**
960         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
961         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
962         * <CODE>Destination</CODE>, they can be used in the destination
963         * parameter to create a <CODE>MessageConsumer</CODE>.
964         *
965         * @param destination the <CODE>Destination</CODE> to access.
966         * @return the MessageConsumer
967         * @throws JMSException if the session fails to create a consumer due to
968         *                 some internal error.
969         * @throws InvalidDestinationException if an invalid destination is
970         *                 specified.
971         * @since 1.1
972         */
973        public MessageConsumer createConsumer(Destination destination) throws JMSException {
974            return createConsumer(destination, (String) null);
975        }
976    
977        /**
978         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
979         * using a message selector. Since <CODE> Queue</CODE> and
980         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
981         * can be used in the destination parameter to create a
982         * <CODE>MessageConsumer</CODE>.
983         * <P>
984         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
985         * that have been sent to a destination.
986         *
987         * @param destination the <CODE>Destination</CODE> to access
988         * @param messageSelector only messages with properties matching the message
989         *                selector expression are delivered. A value of null or an
990         *                empty string indicates that there is no message selector
991         *                for the message consumer.
992         * @return the MessageConsumer
993         * @throws JMSException if the session fails to create a MessageConsumer due
994         *                 to some internal error.
995         * @throws InvalidDestinationException if an invalid destination is
996         *                 specified.
997         * @throws InvalidSelectorException if the message selector is invalid.
998         * @since 1.1
999         */
1000        public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
1001            return createConsumer(destination, messageSelector, false);
1002        }
1003    
1004        /**
1005         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1006         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1007         * <CODE>Destination</CODE>, they can be used in the destination
1008         * parameter to create a <CODE>MessageConsumer</CODE>.
1009         *
1010         * @param destination the <CODE>Destination</CODE> to access.
1011         * @param messageListener the listener to use for async consumption of messages
1012         * @return the MessageConsumer
1013         * @throws JMSException if the session fails to create a consumer due to
1014         *                 some internal error.
1015         * @throws InvalidDestinationException if an invalid destination is
1016         *                 specified.
1017         * @since 1.1
1018         */
1019        public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1020            return createConsumer(destination, null, messageListener);
1021        }
1022    
1023        /**
1024         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1025         * using a message selector. Since <CODE> Queue</CODE> and
1026         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1027         * can be used in the destination parameter to create a
1028         * <CODE>MessageConsumer</CODE>.
1029         * <P>
1030         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1031         * that have been sent to a destination.
1032         *
1033         * @param destination the <CODE>Destination</CODE> to access
1034         * @param messageSelector only messages with properties matching the message
1035         *                selector expression are delivered. A value of null or an
1036         *                empty string indicates that there is no message selector
1037         *                for the message consumer.
1038         * @param messageListener the listener to use for async consumption of messages
1039         * @return the MessageConsumer
1040         * @throws JMSException if the session fails to create a MessageConsumer due
1041         *                 to some internal error.
1042         * @throws InvalidDestinationException if an invalid destination is
1043         *                 specified.
1044         * @throws InvalidSelectorException if the message selector is invalid.
1045         * @since 1.1
1046         */
1047        public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1048            return createConsumer(destination, messageSelector, false, messageListener);
1049        }
1050    
1051        /**
1052         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1053         * using a message selector. This method can specify whether messages
1054         * published by its own connection should be delivered to it, if the
1055         * destination is a topic.
1056         * <P>
1057         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1058         * <CODE>Destination</CODE>, they can be used in the destination
1059         * parameter to create a <CODE>MessageConsumer</CODE>.
1060         * <P>
1061         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1062         * that have been published to a destination.
1063         * <P>
1064         * In some cases, a connection may both publish and subscribe to a topic.
1065         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1066         * inhibit the delivery of messages published by its own connection. The
1067         * default value for this attribute is False. The <CODE>noLocal</CODE>
1068         * value must be supported by destinations that are topics.
1069         *
1070         * @param destination the <CODE>Destination</CODE> to access
1071         * @param messageSelector only messages with properties matching the message
1072         *                selector expression are delivered. A value of null or an
1073         *                empty string indicates that there is no message selector
1074         *                for the message consumer.
1075         * @param noLocal - if true, and the destination is a topic, inhibits the
1076         *                delivery of messages published by its own connection. The
1077         *                behavior for <CODE>NoLocal</CODE> is not specified if
1078         *                the destination is a queue.
1079         * @return the MessageConsumer
1080         * @throws JMSException if the session fails to create a MessageConsumer due
1081         *                 to some internal error.
1082         * @throws InvalidDestinationException if an invalid destination is
1083         *                 specified.
1084         * @throws InvalidSelectorException if the message selector is invalid.
1085         * @since 1.1
1086         */
1087        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1088            return createConsumer(destination, messageSelector, noLocal, null);
1089        }
1090    
1091        /**
1092         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1093         * using a message selector. This method can specify whether messages
1094         * published by its own connection should be delivered to it, if the
1095         * destination is a topic.
1096         * <P>
1097         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1098         * <CODE>Destination</CODE>, they can be used in the destination
1099         * parameter to create a <CODE>MessageConsumer</CODE>.
1100         * <P>
1101         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1102         * that have been published to a destination.
1103         * <P>
1104         * In some cases, a connection may both publish and subscribe to a topic.
1105         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1106         * inhibit the delivery of messages published by its own connection. The
1107         * default value for this attribute is False. The <CODE>noLocal</CODE>
1108         * value must be supported by destinations that are topics.
1109         *
1110         * @param destination the <CODE>Destination</CODE> to access
1111         * @param messageSelector only messages with properties matching the message
1112         *                selector expression are delivered. A value of null or an
1113         *                empty string indicates that there is no message selector
1114         *                for the message consumer.
1115         * @param noLocal - if true, and the destination is a topic, inhibits the
1116         *                delivery of messages published by its own connection. The
1117         *                behavior for <CODE>NoLocal</CODE> is not specified if
1118         *                the destination is a queue.
1119         * @param messageListener the listener to use for async consumption of messages
1120         * @return the MessageConsumer
1121         * @throws JMSException if the session fails to create a MessageConsumer due
1122         *                 to some internal error.
1123         * @throws InvalidDestinationException if an invalid destination is
1124         *                 specified.
1125         * @throws InvalidSelectorException if the message selector is invalid.
1126         * @since 1.1
1127         */
1128        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1129            checkClosed();
1130    
1131            if (destination instanceof CustomDestination) {
1132                CustomDestination customDestination = (CustomDestination)destination;
1133                return customDestination.createConsumer(this, messageSelector, noLocal);
1134            }
1135    
1136            ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1137            int prefetch = 0;
1138            if (destination instanceof Topic) {
1139                prefetch = prefetchPolicy.getTopicPrefetch();
1140            } else {
1141                prefetch = prefetchPolicy.getQueuePrefetch();
1142            }
1143            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1144            return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1145                    prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1146        }
1147    
1148        /**
1149         * Creates a queue identity given a <CODE>Queue</CODE> name.
1150         * <P>
1151         * This facility is provided for the rare cases where clients need to
1152         * dynamically manipulate queue identity. It allows the creation of a queue
1153         * identity with a provider-specific name. Clients that depend on this
1154         * ability are not portable.
1155         * <P>
1156         * Note that this method is not for creating the physical queue. The
1157         * physical creation of queues is an administrative task and is not to be
1158         * initiated by the JMS API. The one exception is the creation of temporary
1159         * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1160         * method.
1161         *
1162         * @param queueName the name of this <CODE>Queue</CODE>
1163         * @return a <CODE>Queue</CODE> with the given name
1164         * @throws JMSException if the session fails to create a queue due to some
1165         *                 internal error.
1166         * @since 1.1
1167         */
1168        public Queue createQueue(String queueName) throws JMSException {
1169            checkClosed();
1170            if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1171                return new ActiveMQTempQueue(queueName);
1172            }
1173            return new ActiveMQQueue(queueName);
1174        }
1175    
1176        /**
1177         * Creates a topic identity given a <CODE>Topic</CODE> name.
1178         * <P>
1179         * This facility is provided for the rare cases where clients need to
1180         * dynamically manipulate topic identity. This allows the creation of a
1181         * topic identity with a provider-specific name. Clients that depend on this
1182         * ability are not portable.
1183         * <P>
1184         * Note that this method is not for creating the physical topic. The
1185         * physical creation of topics is an administrative task and is not to be
1186         * initiated by the JMS API. The one exception is the creation of temporary
1187         * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1188         * method.
1189         *
1190         * @param topicName the name of this <CODE>Topic</CODE>
1191         * @return a <CODE>Topic</CODE> with the given name
1192         * @throws JMSException if the session fails to create a topic due to some
1193         *                 internal error.
1194         * @since 1.1
1195         */
1196        public Topic createTopic(String topicName) throws JMSException {
1197            checkClosed();
1198            if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1199                return new ActiveMQTempTopic(topicName);
1200            }
1201            return new ActiveMQTopic(topicName);
1202        }
1203    
1204        /**
1205         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1206         * the specified queue.
1207         *
1208         * @param queue the <CODE>queue</CODE> to access
1209         * @exception InvalidDestinationException if an invalid destination is
1210         *                    specified
1211         * @since 1.1
1212         */
1213        /**
1214         * Creates a durable subscriber to the specified topic.
1215         * <P>
1216         * If a client needs to receive all the messages published on a topic,
1217         * including the ones published while the subscriber is inactive, it uses a
1218         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1219         * record of this durable subscription and insures that all messages from
1220         * the topic's publishers are retained until they are acknowledged by this
1221         * durable subscriber or they have expired.
1222         * <P>
1223         * Sessions with durable subscribers must always provide the same client
1224         * identifier. In addition, each client must specify a name that uniquely
1225         * identifies (within client identifier) each durable subscription it
1226         * creates. Only one session at a time can have a
1227         * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1228         * <P>
1229         * A client can change an existing durable subscription by creating a
1230         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1231         * and/or message selector. Changing a durable subscriber is equivalent to
1232         * unsubscribing (deleting) the old one and creating a new one.
1233         * <P>
1234         * In some cases, a connection may both publish and subscribe to a topic.
1235         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1236         * inhibit the delivery of messages published by its own connection. The
1237         * default value for this attribute is false.
1238         *
1239         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1240         * @param name the name used to identify this subscription
1241         * @return the TopicSubscriber
1242         * @throws JMSException if the session fails to create a subscriber due to
1243         *                 some internal error.
1244         * @throws InvalidDestinationException if an invalid topic is specified.
1245         * @since 1.1
1246         */
1247        public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1248            checkClosed();
1249            return createDurableSubscriber(topic, name, null, false);
1250        }
1251    
1252        /**
1253         * Creates a durable subscriber to the specified topic, using a message
1254         * selector and specifying whether messages published by its own connection
1255         * should be delivered to it.
1256         * <P>
1257         * If a client needs to receive all the messages published on a topic,
1258         * including the ones published while the subscriber is inactive, it uses a
1259         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1260         * record of this durable subscription and insures that all messages from
1261         * the topic's publishers are retained until they are acknowledged by this
1262         * durable subscriber or they have expired.
1263         * <P>
1264         * Sessions with durable subscribers must always provide the same client
1265         * identifier. In addition, each client must specify a name which uniquely
1266         * identifies (within client identifier) each durable subscription it
1267         * creates. Only one session at a time can have a
1268         * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1269         * inactive durable subscriber is one that exists but does not currently
1270         * have a message consumer associated with it.
1271         * <P>
1272         * A client can change an existing durable subscription by creating a
1273         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1274         * and/or message selector. Changing a durable subscriber is equivalent to
1275         * unsubscribing (deleting) the old one and creating a new one.
1276         *
1277         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1278         * @param name the name used to identify this subscription
1279         * @param messageSelector only messages with properties matching the message
1280         *                selector expression are delivered. A value of null or an
1281         *                empty string indicates that there is no message selector
1282         *                for the message consumer.
1283         * @param noLocal if set, inhibits the delivery of messages published by its
1284         *                own connection
1285         * @return the Queue Browser
1286         * @throws JMSException if the session fails to create a subscriber due to
1287         *                 some internal error.
1288         * @throws InvalidDestinationException if an invalid topic is specified.
1289         * @throws InvalidSelectorException if the message selector is invalid.
1290         * @since 1.1
1291         */
1292        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1293            checkClosed();
1294    
1295            if (topic == null) {
1296                throw new InvalidDestinationException("Topic cannot be null");
1297            }
1298    
1299            if (isIndividualAcknowledge()) {
1300                throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+
1301                                                 "INDIVIDUAL_ACKNOWLEDGE mode.", null);
1302            }
1303    
1304            if (topic instanceof CustomDestination) {
1305                CustomDestination customDestination = (CustomDestination)topic;
1306                return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1307            }
1308    
1309            connection.checkClientIDWasManuallySpecified();
1310            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1311            int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1312            int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1313            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1314                                               noLocal, false, asyncDispatch);
1315        }
1316    
1317        /**
1318         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1319         * the specified queue.
1320         *
1321         * @param queue the <CODE>queue</CODE> to access
1322         * @return the Queue Browser
1323         * @throws JMSException if the session fails to create a browser due to some
1324         *                 internal error.
1325         * @throws InvalidDestinationException if an invalid destination is
1326         *                 specified
1327         * @since 1.1
1328         */
1329        public QueueBrowser createBrowser(Queue queue) throws JMSException {
1330            checkClosed();
1331            return createBrowser(queue, null);
1332        }
1333    
1334        /**
1335         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1336         * the specified queue using a message selector.
1337         *
1338         * @param queue the <CODE>queue</CODE> to access
1339         * @param messageSelector only messages with properties matching the message
1340         *                selector expression are delivered. A value of null or an
1341         *                empty string indicates that there is no message selector
1342         *                for the message consumer.
1343         * @return the Queue Browser
1344         * @throws JMSException if the session fails to create a browser due to some
1345         *                 internal error.
1346         * @throws InvalidDestinationException if an invalid destination is
1347         *                 specified
1348         * @throws InvalidSelectorException if the message selector is invalid.
1349         * @since 1.1
1350         */
1351        public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1352            checkClosed();
1353            return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1354        }
1355    
1356        /**
1357         * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1358         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1359         *
1360         * @return a temporary queue identity
1361         * @throws JMSException if the session fails to create a temporary queue due
1362         *                 to some internal error.
1363         * @since 1.1
1364         */
1365        public TemporaryQueue createTemporaryQueue() throws JMSException {
1366            checkClosed();
1367            return (TemporaryQueue)connection.createTempDestination(false);
1368        }
1369    
1370        /**
1371         * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1372         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1373         *
1374         * @return a temporary topic identity
1375         * @throws JMSException if the session fails to create a temporary topic due
1376         *                 to some internal error.
1377         * @since 1.1
1378         */
1379        public TemporaryTopic createTemporaryTopic() throws JMSException {
1380            checkClosed();
1381            return (TemporaryTopic)connection.createTempDestination(true);
1382        }
1383    
1384        /**
1385         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1386         * the specified queue.
1387         *
1388         * @param queue the <CODE>Queue</CODE> to access
1389         * @return
1390         * @throws JMSException if the session fails to create a receiver due to
1391         *                 some internal error.
1392         * @throws JMSException
1393         * @throws InvalidDestinationException if an invalid queue is specified.
1394         */
1395        public QueueReceiver createReceiver(Queue queue) throws JMSException {
1396            checkClosed();
1397            return createReceiver(queue, null);
1398        }
1399    
1400        /**
1401         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1402         * the specified queue using a message selector.
1403         *
1404         * @param queue the <CODE>Queue</CODE> to access
1405         * @param messageSelector only messages with properties matching the message
1406         *                selector expression are delivered. A value of null or an
1407         *                empty string indicates that there is no message selector
1408         *                for the message consumer.
1409         * @return QueueReceiver
1410         * @throws JMSException if the session fails to create a receiver due to
1411         *                 some internal error.
1412         * @throws InvalidDestinationException if an invalid queue is specified.
1413         * @throws InvalidSelectorException if the message selector is invalid.
1414         */
1415        public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1416            checkClosed();
1417    
1418            if (queue instanceof CustomDestination) {
1419                CustomDestination customDestination = (CustomDestination)queue;
1420                return customDestination.createReceiver(this, messageSelector);
1421            }
1422    
1423            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1424            return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1425                                             prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1426        }
1427    
1428        /**
1429         * Creates a <CODE>QueueSender</CODE> object to send messages to the
1430         * specified queue.
1431         *
1432         * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1433         *                unidentified producer
1434         * @return QueueSender
1435         * @throws JMSException if the session fails to create a sender due to some
1436         *                 internal error.
1437         * @throws InvalidDestinationException if an invalid queue is specified.
1438         */
1439        public QueueSender createSender(Queue queue) throws JMSException {
1440            checkClosed();
1441            if (queue instanceof CustomDestination) {
1442                CustomDestination customDestination = (CustomDestination)queue;
1443                return customDestination.createSender(this);
1444            }
1445            int timeSendOut = connection.getSendTimeout();
1446            return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1447        }
1448    
1449        /**
1450         * Creates a nondurable subscriber to the specified topic. <p/>
1451         * <P>
1452         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1453         * that have been published to a topic. <p/>
1454         * <P>
1455         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1456         * receive only messages that are published while they are active. <p/>
1457         * <P>
1458         * In some cases, a connection may both publish and subscribe to a topic.
1459         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1460         * inhibit the delivery of messages published by its own connection. The
1461         * default value for this attribute is false.
1462         *
1463         * @param topic the <CODE>Topic</CODE> to subscribe to
1464         * @return TopicSubscriber
1465         * @throws JMSException if the session fails to create a subscriber due to
1466         *                 some internal error.
1467         * @throws InvalidDestinationException if an invalid topic is specified.
1468         */
1469        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1470            checkClosed();
1471            return createSubscriber(topic, null, false);
1472        }
1473    
1474        /**
1475         * Creates a nondurable subscriber to the specified topic, using a message
1476         * selector or specifying whether messages published by its own connection
1477         * should be delivered to it. <p/>
1478         * <P>
1479         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1480         * that have been published to a topic. <p/>
1481         * <P>
1482         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1483         * receive only messages that are published while they are active. <p/>
1484         * <P>
1485         * Messages filtered out by a subscriber's message selector will never be
1486         * delivered to the subscriber. From the subscriber's perspective, they do
1487         * not exist. <p/>
1488         * <P>
1489         * In some cases, a connection may both publish and subscribe to a topic.
1490         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1491         * inhibit the delivery of messages published by its own connection. The
1492         * default value for this attribute is false.
1493         *
1494         * @param topic the <CODE>Topic</CODE> to subscribe to
1495         * @param messageSelector only messages with properties matching the message
1496         *                selector expression are delivered. A value of null or an
1497         *                empty string indicates that there is no message selector
1498         *                for the message consumer.
1499         * @param noLocal if set, inhibits the delivery of messages published by its
1500         *                own connection
1501         * @return TopicSubscriber
1502         * @throws JMSException if the session fails to create a subscriber due to
1503         *                 some internal error.
1504         * @throws InvalidDestinationException if an invalid topic is specified.
1505         * @throws InvalidSelectorException if the message selector is invalid.
1506         */
1507        public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1508            checkClosed();
1509    
1510            if (topic instanceof CustomDestination) {
1511                CustomDestination customDestination = (CustomDestination)topic;
1512                return customDestination.createSubscriber(this, messageSelector, noLocal);
1513            }
1514    
1515            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1516            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1517                .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1518        }
1519    
1520        /**
1521         * Creates a publisher for the specified topic. <p/>
1522         * <P>
1523         * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1524         * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1525         * a topic, it defines a new sequence of messages that have no ordering
1526         * relationship with the messages it has previously sent.
1527         *
1528         * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1529         *                an unidentified producer
1530         * @return TopicPublisher
1531         * @throws JMSException if the session fails to create a publisher due to
1532         *                 some internal error.
1533         * @throws InvalidDestinationException if an invalid topic is specified.
1534         */
1535        public TopicPublisher createPublisher(Topic topic) throws JMSException {
1536            checkClosed();
1537    
1538            if (topic instanceof CustomDestination) {
1539                CustomDestination customDestination = (CustomDestination)topic;
1540                return customDestination.createPublisher(this);
1541            }
1542            int timeSendOut = connection.getSendTimeout();
1543            return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1544        }
1545    
1546        /**
1547         * Unsubscribes a durable subscription that has been created by a client.
1548         * <P>
1549         * This method deletes the state being maintained on behalf of the
1550         * subscriber by its provider.
1551         * <P>
1552         * It is erroneous for a client to delete a durable subscription while there
1553         * is an active <CODE>MessageConsumer </CODE> or
1554         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1555         * message is part of a pending transaction or has not been acknowledged in
1556         * the session.
1557         *
1558         * @param name the name used to identify this subscription
1559         * @throws JMSException if the session fails to unsubscribe to the durable
1560         *                 subscription due to some internal error.
1561         * @throws InvalidDestinationException if an invalid subscription name is
1562         *                 specified.
1563         * @since 1.1
1564         */
1565        public void unsubscribe(String name) throws JMSException {
1566            checkClosed();
1567            connection.unsubscribe(name);
1568        }
1569    
1570        public void dispatch(MessageDispatch messageDispatch) {
1571            try {
1572                executor.execute(messageDispatch);
1573            } catch (InterruptedException e) {
1574                Thread.currentThread().interrupt();
1575                connection.onClientInternalException(e);
1576            }
1577        }
1578    
1579        /**
1580         * Acknowledges all consumed messages of the session of this consumed
1581         * message.
1582         * <P>
1583         * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1584         * for use when a client has specified that its JMS session's consumed
1585         * messages are to be explicitly acknowledged. By invoking
1586         * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1587         * all messages consumed by the session that the message was delivered to.
1588         * <P>
1589         * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1590         * sessions and sessions specified to use implicit acknowledgement modes.
1591         * <P>
1592         * A client may individually acknowledge each message as it is consumed, or
1593         * it may choose to acknowledge messages as an application-defined group
1594         * (which is done by calling acknowledge on the last received message of the
1595         * group, thereby acknowledging all messages consumed by the session.)
1596         * <P>
1597         * Messages that have been received but not acknowledged may be redelivered.
1598         *
1599         * @throws JMSException if the JMS provider fails to acknowledge the
1600         *                 messages due to some internal error.
1601         * @throws javax.jms.IllegalStateException if this method is called on a
1602         *                 closed session.
1603         * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1604         */
1605        public void acknowledge() throws JMSException {
1606            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1607                ActiveMQMessageConsumer c = iter.next();
1608                c.acknowledge();
1609            }
1610        }
1611    
1612        /**
1613         * Add a message consumer.
1614         *
1615         * @param consumer - message consumer.
1616         * @throws JMSException
1617         */
1618        protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1619            this.consumers.add(consumer);
1620            if (consumer.isDurableSubscriber()) {
1621                stats.onCreateDurableSubscriber();
1622            }
1623            this.connection.addDispatcher(consumer.getConsumerId(), this);
1624        }
1625    
1626        /**
1627         * Remove the message consumer.
1628         *
1629         * @param consumer - consumer to be removed.
1630         * @throws JMSException
1631         */
1632        protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1633            this.connection.removeDispatcher(consumer.getConsumerId());
1634            if (consumer.isDurableSubscriber()) {
1635                stats.onRemoveDurableSubscriber();
1636            }
1637            this.consumers.remove(consumer);
1638            this.connection.removeDispatcher(consumer);
1639        }
1640    
1641        /**
1642         * Adds a message producer.
1643         *
1644         * @param producer - message producer to be added.
1645         * @throws JMSException
1646         */
1647        protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1648            this.producers.add(producer);
1649            this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1650        }
1651    
1652        /**
1653         * Removes a message producer.
1654         *
1655         * @param producer - message producer to be removed.
1656         * @throws JMSException
1657         */
1658        protected void removeProducer(ActiveMQMessageProducer producer) {
1659            this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1660            this.producers.remove(producer);
1661        }
1662    
1663        /**
1664         * Start this Session.
1665         *
1666         * @throws JMSException
1667         */
1668        protected void start() throws JMSException {
1669            started.set(true);
1670            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1671                ActiveMQMessageConsumer c = iter.next();
1672                c.start();
1673            }
1674            executor.start();
1675        }
1676    
1677        /**
1678         * Stops this session.
1679         *
1680         * @throws JMSException
1681         */
1682        protected void stop() throws JMSException {
1683    
1684            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1685                ActiveMQMessageConsumer c = iter.next();
1686                c.stop();
1687            }
1688    
1689            started.set(false);
1690            executor.stop();
1691        }
1692    
1693        /**
1694         * Returns the session id.
1695         *
1696         * @return value - session id.
1697         */
1698        protected SessionId getSessionId() {
1699            return info.getSessionId();
1700        }
1701    
1702        /**
1703         * @return
1704         */
1705        protected ConsumerId getNextConsumerId() {
1706            return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1707        }
1708    
1709        /**
1710         * @return
1711         */
1712        protected ProducerId getNextProducerId() {
1713            return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1714        }
1715    
1716        /**
1717         * Sends the message for dispatch by the broker.
1718         *
1719         *
1720         * @param producer - message producer.
1721         * @param destination - message destination.
1722         * @param message - message to be sent.
1723         * @param deliveryMode - JMS messsage delivery mode.
1724         * @param priority - message priority.
1725         * @param timeToLive - message expiration.
1726         * @param producerWindow
1727         * @param onComplete
1728         * @throws JMSException
1729         */
1730        protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1731                            MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1732    
1733            checkClosed();
1734            if (destination.isTemporary() && connection.isDeleted(destination)) {
1735                throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1736            }
1737            synchronized (sendMutex) {
1738                // tell the Broker we are about to start a new transaction
1739                doStartTransaction();
1740                TransactionId txid = transactionContext.getTransactionId();
1741                long sequenceNumber = producer.getMessageSequence();
1742    
1743                //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1744                message.setJMSDeliveryMode(deliveryMode);
1745                long expiration = 0L;
1746                if (!producer.getDisableMessageTimestamp()) {
1747                    long timeStamp = System.currentTimeMillis();
1748                    message.setJMSTimestamp(timeStamp);
1749                    if (timeToLive > 0) {
1750                        expiration = timeToLive + timeStamp;
1751                    }
1752                }
1753                message.setJMSExpiration(expiration);
1754                message.setJMSPriority(priority);
1755                message.setJMSRedelivered(false);
1756    
1757                // transform to our own message format here
1758                ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1759                msg.setDestination(destination);
1760                msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1761    
1762                // Set the message id.
1763                if (msg != message) {
1764                    message.setJMSMessageID(msg.getMessageId().toString());
1765                    // Make sure the JMS destination is set on the foreign messages too.
1766                    message.setJMSDestination(destination);
1767                }
1768                //clear the brokerPath in case we are re-sending this message
1769                msg.setBrokerPath(null);
1770    
1771                msg.setTransactionId(txid);
1772                if (connection.isCopyMessageOnSend()) {
1773                    msg = (ActiveMQMessage)msg.copy();
1774                }
1775                msg.setConnection(connection);
1776                msg.onSend();
1777                msg.setProducerId(msg.getMessageId().getProducerId());
1778                if (LOG.isTraceEnabled()) {
1779                    LOG.trace(getSessionId() + " sending message: " + msg);
1780                }
1781                if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1782                    this.connection.asyncSendPacket(msg);
1783                    if (producerWindow != null) {
1784                        // Since we defer lots of the marshaling till we hit the
1785                        // wire, this might not
1786                        // provide and accurate size. We may change over to doing
1787                        // more aggressive marshaling,
1788                        // to get more accurate sizes.. this is more important once
1789                        // users start using producer window
1790                        // flow control.
1791                        int size = msg.getSize();
1792                        producerWindow.increaseUsage(size);
1793                    }
1794                } else {
1795                    if (sendTimeout > 0 && onComplete==null) {
1796                        this.connection.syncSendPacket(msg,sendTimeout);
1797                    }else {
1798                        this.connection.syncSendPacket(msg, onComplete);
1799                    }
1800                }
1801    
1802            }
1803        }
1804    
1805        /**
1806         * Send TransactionInfo to indicate transaction has started
1807         *
1808         * @throws JMSException if some internal error occurs
1809         */
1810        protected void doStartTransaction() throws JMSException {
1811            if (getTransacted() && !transactionContext.isInXATransaction()) {
1812                transactionContext.begin();
1813            }
1814        }
1815    
1816        /**
1817         * Checks whether the session has unconsumed messages.
1818         *
1819         * @return true - if there are unconsumed messages.
1820         */
1821        public boolean hasUncomsumedMessages() {
1822            return executor.hasUncomsumedMessages();
1823        }
1824    
1825        /**
1826         * Checks whether the session uses transactions.
1827         *
1828         * @return true - if the session uses transactions.
1829         */
1830        public boolean isTransacted() {
1831            return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1832        }
1833    
1834        /**
1835         * Checks whether the session used client acknowledgment.
1836         *
1837         * @return true - if the session uses client acknowledgment.
1838         */
1839        protected boolean isClientAcknowledge() {
1840            return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1841        }
1842    
1843        /**
1844         * Checks whether the session used auto acknowledgment.
1845         *
1846         * @return true - if the session uses client acknowledgment.
1847         */
1848        public boolean isAutoAcknowledge() {
1849            return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1850        }
1851    
1852        /**
1853         * Checks whether the session used dup ok acknowledgment.
1854         *
1855         * @return true - if the session uses client acknowledgment.
1856         */
1857        public boolean isDupsOkAcknowledge() {
1858            return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1859        }
1860    
1861        public boolean isIndividualAcknowledge(){
1862            return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1863        }
1864    
1865        /**
1866         * Returns the message delivery listener.
1867         *
1868         * @return deliveryListener - message delivery listener.
1869         */
1870        public DeliveryListener getDeliveryListener() {
1871            return deliveryListener;
1872        }
1873    
1874        /**
1875         * Sets the message delivery listener.
1876         *
1877         * @param deliveryListener - message delivery listener.
1878         */
1879        public void setDeliveryListener(DeliveryListener deliveryListener) {
1880            this.deliveryListener = deliveryListener;
1881        }
1882    
1883        /**
1884         * Returns the SessionInfo bean.
1885         *
1886         * @return info - SessionInfo bean.
1887         * @throws JMSException
1888         */
1889        protected SessionInfo getSessionInfo() throws JMSException {
1890            SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1891            return info;
1892        }
1893    
1894        /**
1895         * Send the asynchronus command.
1896         *
1897         * @param command - command to be executed.
1898         * @throws JMSException
1899         */
1900        public void asyncSendPacket(Command command) throws JMSException {
1901            connection.asyncSendPacket(command);
1902        }
1903    
1904        /**
1905         * Send the synchronus command.
1906         *
1907         * @param command - command to be executed.
1908         * @return Response
1909         * @throws JMSException
1910         */
1911        public Response syncSendPacket(Command command) throws JMSException {
1912            return connection.syncSendPacket(command);
1913        }
1914    
1915        public long getNextDeliveryId() {
1916            return deliveryIdGenerator.getNextSequenceId();
1917        }
1918    
1919        public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1920    
1921            List<MessageDispatch> c = unconsumedMessages.removeAll();
1922            for (MessageDispatch md : c) {
1923                this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1924            }
1925            Collections.reverse(c);
1926    
1927            for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1928                MessageDispatch md = iter.next();
1929                executor.executeFirst(md);
1930            }
1931    
1932        }
1933    
1934        public boolean isRunning() {
1935            return started.get();
1936        }
1937    
1938        public boolean isAsyncDispatch() {
1939            return asyncDispatch;
1940        }
1941    
1942        public void setAsyncDispatch(boolean asyncDispatch) {
1943            this.asyncDispatch = asyncDispatch;
1944        }
1945    
1946        /**
1947         * @return Returns the sessionAsyncDispatch.
1948         */
1949        public boolean isSessionAsyncDispatch() {
1950            return sessionAsyncDispatch;
1951        }
1952    
1953        /**
1954         * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1955         */
1956        public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1957            this.sessionAsyncDispatch = sessionAsyncDispatch;
1958        }
1959    
1960        public MessageTransformer getTransformer() {
1961            return transformer;
1962        }
1963    
1964        public ActiveMQConnection getConnection() {
1965            return connection;
1966        }
1967    
1968        /**
1969         * Sets the transformer used to transform messages before they are sent on
1970         * to the JMS bus or when they are received from the bus but before they are
1971         * delivered to the JMS client
1972         */
1973        public void setTransformer(MessageTransformer transformer) {
1974            this.transformer = transformer;
1975        }
1976    
1977        public BlobTransferPolicy getBlobTransferPolicy() {
1978            return blobTransferPolicy;
1979        }
1980    
1981        /**
1982         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1983         * OBjects) are transferred from producers to brokers to consumers
1984         */
1985        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1986            this.blobTransferPolicy = blobTransferPolicy;
1987        }
1988    
1989        public List<MessageDispatch> getUnconsumedMessages() {
1990            return executor.getUnconsumedMessages();
1991        }
1992    
1993        @Override
1994        public String toString() {
1995            return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1996        }
1997    
1998        public void checkMessageListener() throws JMSException {
1999            if (messageListener != null) {
2000                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2001            }
2002            for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
2003                ActiveMQMessageConsumer consumer = i.next();
2004                if (consumer.getMessageListener() != null) {
2005                    throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2006                }
2007            }
2008        }
2009    
2010        protected void setOptimizeAcknowledge(boolean value) {
2011            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2012                ActiveMQMessageConsumer c = iter.next();
2013                c.setOptimizeAcknowledge(value);
2014            }
2015        }
2016    
2017        protected void setPrefetchSize(ConsumerId id, int prefetch) {
2018            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2019                ActiveMQMessageConsumer c = iter.next();
2020                if (c.getConsumerId().equals(id)) {
2021                    c.setPrefetchSize(prefetch);
2022                    break;
2023                }
2024            }
2025        }
2026    
2027        protected void close(ConsumerId id) {
2028            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2029                ActiveMQMessageConsumer c = iter.next();
2030                if (c.getConsumerId().equals(id)) {
2031                    try {
2032                        c.close();
2033                    } catch (JMSException e) {
2034                        LOG.warn("Exception closing consumer", e);
2035                    }
2036                    LOG.warn("Closed consumer on Command");
2037                    break;
2038                }
2039            }
2040        }
2041    
2042        public boolean isInUse(ActiveMQTempDestination destination) {
2043            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2044                ActiveMQMessageConsumer c = iter.next();
2045                if (c.isInUse(destination)) {
2046                    return true;
2047                }
2048            }
2049            return false;
2050        }
2051    
2052        /**
2053         * highest sequence id of the last message delivered by this session.
2054         * Passed to the broker in the close command, maintained by dispose()
2055         * @return lastDeliveredSequenceId
2056         */
2057        public long getLastDeliveredSequenceId() {
2058            return lastDeliveredSequenceId;
2059        }
2060    
2061        protected void sendAck(MessageAck ack) throws JMSException {
2062            sendAck(ack,false);
2063        }
2064    
2065        protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2066            if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2067                asyncSendPacket(ack);
2068            } else {
2069                syncSendPacket(ack);
2070            }
2071        }
2072    
2073        protected Scheduler getScheduler() throws JMSException {
2074            return this.connection.getScheduler();
2075        }
2076    
2077        protected ThreadPoolExecutor getConnectionExecutor() {
2078            return this.connectionExecutor;
2079        }
2080    }