001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.File;
020    import java.io.InputStream;
021    import java.io.Serializable;
022    import java.net.URL;
023    import java.util.Collections;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.concurrent.CopyOnWriteArrayList;
027    import java.util.concurrent.ThreadPoolExecutor;
028    import java.util.concurrent.atomic.AtomicBoolean;
029    import java.util.concurrent.atomic.AtomicInteger;
030    
031    import javax.jms.BytesMessage;
032    import javax.jms.Destination;
033    import javax.jms.IllegalStateException;
034    import javax.jms.InvalidDestinationException;
035    import javax.jms.InvalidSelectorException;
036    import javax.jms.JMSException;
037    import javax.jms.MapMessage;
038    import javax.jms.Message;
039    import javax.jms.MessageConsumer;
040    import javax.jms.MessageListener;
041    import javax.jms.MessageProducer;
042    import javax.jms.ObjectMessage;
043    import javax.jms.Queue;
044    import javax.jms.QueueBrowser;
045    import javax.jms.QueueReceiver;
046    import javax.jms.QueueSender;
047    import javax.jms.QueueSession;
048    import javax.jms.Session;
049    import javax.jms.StreamMessage;
050    import javax.jms.TemporaryQueue;
051    import javax.jms.TemporaryTopic;
052    import javax.jms.TextMessage;
053    import javax.jms.Topic;
054    import javax.jms.TopicPublisher;
055    import javax.jms.TopicSession;
056    import javax.jms.TopicSubscriber;
057    import javax.jms.TransactionRolledBackException;
058    
059    import org.apache.activemq.blob.BlobDownloader;
060    import org.apache.activemq.blob.BlobTransferPolicy;
061    import org.apache.activemq.blob.BlobUploader;
062    import org.apache.activemq.command.ActiveMQBlobMessage;
063    import org.apache.activemq.command.ActiveMQBytesMessage;
064    import org.apache.activemq.command.ActiveMQDestination;
065    import org.apache.activemq.command.ActiveMQMapMessage;
066    import org.apache.activemq.command.ActiveMQMessage;
067    import org.apache.activemq.command.ActiveMQObjectMessage;
068    import org.apache.activemq.command.ActiveMQQueue;
069    import org.apache.activemq.command.ActiveMQStreamMessage;
070    import org.apache.activemq.command.ActiveMQTempDestination;
071    import org.apache.activemq.command.ActiveMQTempQueue;
072    import org.apache.activemq.command.ActiveMQTempTopic;
073    import org.apache.activemq.command.ActiveMQTextMessage;
074    import org.apache.activemq.command.ActiveMQTopic;
075    import org.apache.activemq.command.Command;
076    import org.apache.activemq.command.ConsumerId;
077    import org.apache.activemq.command.MessageAck;
078    import org.apache.activemq.command.MessageDispatch;
079    import org.apache.activemq.command.MessageId;
080    import org.apache.activemq.command.ProducerId;
081    import org.apache.activemq.command.RemoveInfo;
082    import org.apache.activemq.command.Response;
083    import org.apache.activemq.command.SessionId;
084    import org.apache.activemq.command.SessionInfo;
085    import org.apache.activemq.command.TransactionId;
086    import org.apache.activemq.management.JMSSessionStatsImpl;
087    import org.apache.activemq.management.StatsCapable;
088    import org.apache.activemq.management.StatsImpl;
089    import org.apache.activemq.thread.Scheduler;
090    import org.apache.activemq.transaction.Synchronization;
091    import org.apache.activemq.usage.MemoryUsage;
092    import org.apache.activemq.util.Callback;
093    import org.apache.activemq.util.JMSExceptionSupport;
094    import org.apache.activemq.util.LongSequenceGenerator;
095    import org.slf4j.Logger;
096    import org.slf4j.LoggerFactory;
097    
098    /**
099     * <P>
100     * A <CODE>Session</CODE> object is a single-threaded context for producing
101     * and consuming messages. Although it may allocate provider resources outside
102     * the Java virtual machine (JVM), it is considered a lightweight JMS object.
103     * <P>
104     * A session serves several purposes:
105     * <UL>
106     * <LI>It is a factory for its message producers and consumers.
107     * <LI>It supplies provider-optimized message factories.
108     * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
109     * <CODE>TemporaryQueues</CODE>.
110     * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
111     * objects for those clients that need to dynamically manipulate
112     * provider-specific destination names.
113     * <LI>It supports a single series of transactions that combine work spanning
114     * its producers and consumers into atomic units.
115     * <LI>It defines a serial order for the messages it consumes and the messages
116     * it produces.
117     * <LI>It retains messages it consumes until they have been acknowledged.
118     * <LI>It serializes execution of message listeners registered with its message
119     * consumers.
120     * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
121     * </UL>
122     * <P>
123     * A session can create and service multiple message producers and consumers.
124     * <P>
125     * One typical use is to have a thread block on a synchronous
126     * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
127     * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
128     * <P>
129     * If a client desires to have one thread produce messages while others consume
130     * them, the client should use a separate session for its producing thread.
131     * <P>
132     * Once a connection has been started, any session with one or more registered
133     * message listeners is dedicated to the thread of control that delivers
134     * messages to it. It is erroneous for client code to use this session or any of
135     * its constituent objects from another thread of control. The only exception to
136     * this rule is the use of the session or connection <CODE>close</CODE>
137     * method.
138     * <P>
139     * It should be easy for most clients to partition their work naturally into
140     * sessions. This model allows clients to start simply and incrementally add
141     * message processing complexity as their need for concurrency grows.
142     * <P>
143     * The <CODE>close</CODE> method is the only session method that can be called
144     * while some other session method is being executed in another thread.
145     * <P>
146     * A session may be specified as transacted. Each transacted session supports a
147     * single series of transactions. Each transaction groups a set of message sends
148     * and a set of message receives into an atomic unit of work. In effect,
149     * transactions organize a session's input message stream and output message
150     * stream into series of atomic units. When a transaction commits, its atomic
151     * unit of input is acknowledged and its associated atomic unit of output is
152     * sent. If a transaction rollback is done, the transaction's sent messages are
153     * destroyed and the session's input is automatically recovered.
154     * <P>
155     * The content of a transaction's input and output units is simply those
156     * messages that have been produced and consumed within the session's current
157     * transaction.
158     * <P>
159     * A transaction is completed using either its session's <CODE>commit</CODE>
160     * method or its session's <CODE>rollback </CODE> method. The completion of a
161     * session's current transaction automatically begins the next. The result is
162     * that a transacted session always has a current transaction within which its
163     * work is done.
164     * <P>
165     * The Java Transaction Service (JTS) or some other transaction monitor may be
166     * used to combine a session's transaction with transactions on other resources
167     * (databases, other JMS sessions, etc.). Since Java distributed transactions
168     * are controlled via the Java Transaction API (JTA), use of the session's
169     * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
170     * prohibited.
171     * <P>
172     * The JMS API does not require support for JTA; however, it does define how a
173     * provider supplies this support.
174     * <P>
175     * Although it is also possible for a JMS client to handle distributed
176     * transactions directly, it is unlikely that many JMS clients will do this.
177     * Support for JTA in the JMS API is targeted at systems vendors who will be
178     * integrating the JMS API into their application server products.
179     *
180     *
181     * @see javax.jms.Session
182     * @see javax.jms.QueueSession
183     * @see javax.jms.TopicSession
184     * @see javax.jms.XASession
185     */
186    public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
187    
188        /**
189         * Only acknowledge an individual message - using message.acknowledge()
190         * as opposed to CLIENT_ACKNOWLEDGE which
191         * acknowledges all messages consumed by a session at when acknowledge()
192         * is called
193         */
194        public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
195        public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
196    
197        public static interface DeliveryListener {
198            void beforeDelivery(ActiveMQSession session, Message msg);
199    
200            void afterDelivery(ActiveMQSession session, Message msg);
201        }
202    
203        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
204        private final ThreadPoolExecutor connectionExecutor;
205    
206        protected int acknowledgementMode;
207        protected final ActiveMQConnection connection;
208        protected final SessionInfo info;
209        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
210        protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
211        protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
212        protected final ActiveMQSessionExecutor executor;
213        protected final AtomicBoolean started = new AtomicBoolean(false);
214    
215        protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
216        protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
217    
218        protected boolean closed;
219        private volatile boolean synchronizationRegistered;
220        protected boolean asyncDispatch;
221        protected boolean sessionAsyncDispatch;
222        protected final boolean debug;
223        protected Object sendMutex = new Object();
224    
225        private MessageListener messageListener;
226        private final JMSSessionStatsImpl stats;
227        private TransactionContext transactionContext;
228        private DeliveryListener deliveryListener;
229        private MessageTransformer transformer;
230        private BlobTransferPolicy blobTransferPolicy;
231        private long lastDeliveredSequenceId;
232    
233        /**
234         * Construct the Session
235         *
236         * @param connection
237         * @param sessionId
238         * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
239         *                Session.SESSION_TRANSACTED
240         * @param asyncDispatch
241         * @param sessionAsyncDispatch
242         * @throws JMSException on internal error
243         */
244        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
245            this.debug = LOG.isDebugEnabled();
246            this.connection = connection;
247            this.acknowledgementMode = acknowledgeMode;
248            this.asyncDispatch = asyncDispatch;
249            this.sessionAsyncDispatch = sessionAsyncDispatch;
250            this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
251            setTransactionContext(new TransactionContext(connection));
252            stats = new JMSSessionStatsImpl(producers, consumers);
253            this.connection.asyncSendPacket(info);
254            setTransformer(connection.getTransformer());
255            setBlobTransferPolicy(connection.getBlobTransferPolicy());
256            this.connectionExecutor=connection.getExecutor();
257            this.executor = new ActiveMQSessionExecutor(this);
258            connection.addSession(this);
259            if (connection.isStarted()) {
260                start();
261            }
262    
263        }
264    
265        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
266            this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
267        }
268    
269        /**
270         * Sets the transaction context of the session.
271         *
272         * @param transactionContext - provides the means to control a JMS
273         *                transaction.
274         */
275        public void setTransactionContext(TransactionContext transactionContext) {
276            this.transactionContext = transactionContext;
277        }
278    
279        /**
280         * Returns the transaction context of the session.
281         *
282         * @return transactionContext - session's transaction context.
283         */
284        public TransactionContext getTransactionContext() {
285            return transactionContext;
286        }
287    
288        /*
289         * (non-Javadoc)
290         *
291         * @see org.apache.activemq.management.StatsCapable#getStats()
292         */
293        public StatsImpl getStats() {
294            return stats;
295        }
296    
297        /**
298         * Returns the session's statistics.
299         *
300         * @return stats - session's statistics.
301         */
302        public JMSSessionStatsImpl getSessionStats() {
303            return stats;
304        }
305    
306        /**
307         * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
308         * object is used to send a message containing a stream of uninterpreted
309         * bytes.
310         *
311         * @return the an ActiveMQBytesMessage
312         * @throws JMSException if the JMS provider fails to create this message due
313         *                 to some internal error.
314         */
315        public BytesMessage createBytesMessage() throws JMSException {
316            ActiveMQBytesMessage message = new ActiveMQBytesMessage();
317            configureMessage(message);
318            return message;
319        }
320    
321        /**
322         * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
323         * object is used to send a self-defining set of name-value pairs, where
324         * names are <CODE>String</CODE> objects and values are primitive values
325         * in the Java programming language.
326         *
327         * @return an ActiveMQMapMessage
328         * @throws JMSException if the JMS provider fails to create this message due
329         *                 to some internal error.
330         */
331        public MapMessage createMapMessage() throws JMSException {
332            ActiveMQMapMessage message = new ActiveMQMapMessage();
333            configureMessage(message);
334            return message;
335        }
336    
337        /**
338         * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
339         * interface is the root interface of all JMS messages. A
340         * <CODE>Message</CODE> object holds all the standard message header
341         * information. It can be sent when a message containing only header
342         * information is sufficient.
343         *
344         * @return an ActiveMQMessage
345         * @throws JMSException if the JMS provider fails to create this message due
346         *                 to some internal error.
347         */
348        public Message createMessage() throws JMSException {
349            ActiveMQMessage message = new ActiveMQMessage();
350            configureMessage(message);
351            return message;
352        }
353    
354        /**
355         * Creates an <CODE>ObjectMessage</CODE> object. An
356         * <CODE>ObjectMessage</CODE> object is used to send a message that
357         * contains a serializable Java object.
358         *
359         * @return an ActiveMQObjectMessage
360         * @throws JMSException if the JMS provider fails to create this message due
361         *                 to some internal error.
362         */
363        public ObjectMessage createObjectMessage() throws JMSException {
364            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
365            configureMessage(message);
366            return message;
367        }
368    
369        /**
370         * Creates an initialized <CODE>ObjectMessage</CODE> object. An
371         * <CODE>ObjectMessage</CODE> object is used to send a message that
372         * contains a serializable Java object.
373         *
374         * @param object the object to use to initialize this message
375         * @return an ActiveMQObjectMessage
376         * @throws JMSException if the JMS provider fails to create this message due
377         *                 to some internal error.
378         */
379        public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
380            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
381            configureMessage(message);
382            message.setObject(object);
383            return message;
384        }
385    
386        /**
387         * Creates a <CODE>StreamMessage</CODE> object. A
388         * <CODE>StreamMessage</CODE> object is used to send a self-defining
389         * stream of primitive values in the Java programming language.
390         *
391         * @return an ActiveMQStreamMessage
392         * @throws JMSException if the JMS provider fails to create this message due
393         *                 to some internal error.
394         */
395        public StreamMessage createStreamMessage() throws JMSException {
396            ActiveMQStreamMessage message = new ActiveMQStreamMessage();
397            configureMessage(message);
398            return message;
399        }
400    
401        /**
402         * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
403         * object is used to send a message containing a <CODE>String</CODE>
404         * object.
405         *
406         * @return an ActiveMQTextMessage
407         * @throws JMSException if the JMS provider fails to create this message due
408         *                 to some internal error.
409         */
410        public TextMessage createTextMessage() throws JMSException {
411            ActiveMQTextMessage message = new ActiveMQTextMessage();
412            configureMessage(message);
413            return message;
414        }
415    
416        /**
417         * Creates an initialized <CODE>TextMessage</CODE> object. A
418         * <CODE>TextMessage</CODE> object is used to send a message containing a
419         * <CODE>String</CODE>.
420         *
421         * @param text the string used to initialize this message
422         * @return an ActiveMQTextMessage
423         * @throws JMSException if the JMS provider fails to create this message due
424         *                 to some internal error.
425         */
426        public TextMessage createTextMessage(String text) throws JMSException {
427            ActiveMQTextMessage message = new ActiveMQTextMessage();
428            message.setText(text);
429            configureMessage(message);
430            return message;
431        }
432    
433        /**
434         * Creates an initialized <CODE>BlobMessage</CODE> object. A
435         * <CODE>BlobMessage</CODE> object is used to send a message containing a
436         * <CODE>URL</CODE> which points to some network addressible BLOB.
437         *
438         * @param url the network addressable URL used to pass directly to the
439         *                consumer
440         * @return a BlobMessage
441         * @throws JMSException if the JMS provider fails to create this message due
442         *                 to some internal error.
443         */
444        public BlobMessage createBlobMessage(URL url) throws JMSException {
445            return createBlobMessage(url, false);
446        }
447    
448        /**
449         * Creates an initialized <CODE>BlobMessage</CODE> object. A
450         * <CODE>BlobMessage</CODE> object is used to send a message containing a
451         * <CODE>URL</CODE> which points to some network addressible BLOB.
452         *
453         * @param url the network addressable URL used to pass directly to the
454         *                consumer
455         * @param deletedByBroker indicates whether or not the resource is deleted
456         *                by the broker when the message is acknowledged
457         * @return a BlobMessage
458         * @throws JMSException if the JMS provider fails to create this message due
459         *                 to some internal error.
460         */
461        public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
462            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
463            configureMessage(message);
464            message.setURL(url);
465            message.setDeletedByBroker(deletedByBroker);
466            message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
467            return message;
468        }
469    
470        /**
471         * Creates an initialized <CODE>BlobMessage</CODE> object. A
472         * <CODE>BlobMessage</CODE> object is used to send a message containing
473         * the <CODE>File</CODE> content. Before the message is sent the file
474         * conent will be uploaded to the broker or some other remote repository
475         * depending on the {@link #getBlobTransferPolicy()}.
476         *
477         * @param file the file to be uploaded to some remote repo (or the broker)
478         *                depending on the strategy
479         * @return a BlobMessage
480         * @throws JMSException if the JMS provider fails to create this message due
481         *                 to some internal error.
482         */
483        public BlobMessage createBlobMessage(File file) throws JMSException {
484            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
485            configureMessage(message);
486            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
487            message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
488            message.setDeletedByBroker(true);
489            message.setName(file.getName());
490            return message;
491        }
492    
493        /**
494         * Creates an initialized <CODE>BlobMessage</CODE> object. A
495         * <CODE>BlobMessage</CODE> object is used to send a message containing
496         * the <CODE>File</CODE> content. Before the message is sent the file
497         * conent will be uploaded to the broker or some other remote repository
498         * depending on the {@link #getBlobTransferPolicy()}.
499         *
500         * @param in the stream to be uploaded to some remote repo (or the broker)
501         *                depending on the strategy
502         * @return a BlobMessage
503         * @throws JMSException if the JMS provider fails to create this message due
504         *                 to some internal error.
505         */
506        public BlobMessage createBlobMessage(InputStream in) throws JMSException {
507            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
508            configureMessage(message);
509            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
510            message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
511            message.setDeletedByBroker(true);
512            return message;
513        }
514    
515        /**
516         * Indicates whether the session is in transacted mode.
517         *
518         * @return true if the session is in transacted mode
519         * @throws JMSException if there is some internal error.
520         */
521        public boolean getTransacted() throws JMSException {
522            checkClosed();
523            return isTransacted();
524        }
525    
526        /**
527         * Returns the acknowledgement mode of the session. The acknowledgement mode
528         * is set at the time that the session is created. If the session is
529         * transacted, the acknowledgement mode is ignored.
530         *
531         * @return If the session is not transacted, returns the current
532         *         acknowledgement mode for the session. If the session is
533         *         transacted, returns SESSION_TRANSACTED.
534         * @throws JMSException
535         * @see javax.jms.Connection#createSession(boolean,int)
536         * @since 1.1 exception JMSException if there is some internal error.
537         */
538        public int getAcknowledgeMode() throws JMSException {
539            checkClosed();
540            return this.acknowledgementMode;
541        }
542    
543        /**
544         * Commits all messages done in this transaction and releases any locks
545         * currently held.
546         *
547         * @throws JMSException if the JMS provider fails to commit the transaction
548         *                 due to some internal error.
549         * @throws TransactionRolledBackException if the transaction is rolled back
550         *                 due to some internal error during commit.
551         * @throws javax.jms.IllegalStateException if the method is not called by a
552         *                 transacted session.
553         */
554        public void commit() throws JMSException {
555            checkClosed();
556            if (!getTransacted()) {
557                throw new javax.jms.IllegalStateException("Not a transacted session");
558            }
559            if (LOG.isDebugEnabled()) {
560                LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
561            }
562            transactionContext.commit();
563        }
564    
565        /**
566         * Rolls back any messages done in this transaction and releases any locks
567         * currently held.
568         *
569         * @throws JMSException if the JMS provider fails to roll back the
570         *                 transaction due to some internal error.
571         * @throws javax.jms.IllegalStateException if the method is not called by a
572         *                 transacted session.
573         */
574        public void rollback() throws JMSException {
575            checkClosed();
576            if (!getTransacted()) {
577                throw new javax.jms.IllegalStateException("Not a transacted session");
578            }
579            if (LOG.isDebugEnabled()) {
580                LOG.debug(getSessionId() + " Transaction Rollback, txid:"  + transactionContext.getTransactionId());
581            }
582            transactionContext.rollback();
583        }
584    
585        /**
586         * Closes the session.
587         * <P>
588         * Since a provider may allocate some resources on behalf of a session
589         * outside the JVM, clients should close the resources when they are not
590         * needed. Relying on garbage collection to eventually reclaim these
591         * resources may not be timely enough.
592         * <P>
593         * There is no need to close the producers and consumers of a closed
594         * session.
595         * <P>
596         * This call will block until a <CODE>receive</CODE> call or message
597         * listener in progress has completed. A blocked message consumer
598         * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
599         * is closed.
600         * <P>
601         * Closing a transacted session must roll back the transaction in progress.
602         * <P>
603         * This method is the only <CODE>Session</CODE> method that can be called
604         * concurrently.
605         * <P>
606         * Invoking any other <CODE>Session</CODE> method on a closed session must
607         * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
608         * closed session must <I>not </I> throw an exception.
609         *
610         * @throws JMSException if the JMS provider fails to close the session due
611         *                 to some internal error.
612         */
613        public void close() throws JMSException {
614            if (!closed) {
615                if (getTransactionContext().isInXATransaction()) {
616                    if (!synchronizationRegistered) {
617                        synchronizationRegistered = true;
618                        getTransactionContext().addSynchronization(new Synchronization() {
619    
620                                            @Override
621                                            public void afterCommit() throws Exception {
622                                                doClose();
623                                                synchronizationRegistered = false;
624                                            }
625    
626                                            @Override
627                                            public void afterRollback() throws Exception {
628                                                doClose();
629                                                synchronizationRegistered = false;
630                                            }
631                                        });
632                    }
633    
634                } else {
635                    doClose();
636                }
637            }
638        }
639    
640        private void doClose() throws JMSException {
641            boolean interrupted = Thread.interrupted();
642            dispose();
643            RemoveInfo removeCommand = info.createRemoveCommand();
644            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
645            connection.asyncSendPacket(removeCommand);
646            if (interrupted) {
647                Thread.currentThread().interrupt();
648            }
649        }
650    
651        void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
652            executor.clearMessagesInProgress();
653            // we are called from inside the transport reconnection logic
654            // which involves us clearing all the connections' consumers
655            // dispatch and delivered lists. So rather than trying to
656            // grab a mutex (which could be already owned by the message
657            // listener calling the send or an ack) we allow it to complete in
658            // a separate thread via the scheduler and notify us via
659            // connection.transportInterruptionProcessingComplete()
660            //
661            for (final ActiveMQMessageConsumer consumer : consumers) {
662                consumer.inProgressClearRequired();
663                transportInterruptionProcessingComplete.incrementAndGet();
664                try {
665                    connection.getScheduler().executeAfterDelay(new Runnable() {
666                        public void run() {
667                            consumer.clearMessagesInProgress();
668                        }}, 0l);
669                } catch (JMSException e) {
670                    connection.onClientInternalException(e);
671                }
672            }
673        }
674    
675        void deliverAcks() {
676            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
677                ActiveMQMessageConsumer consumer = iter.next();
678                consumer.deliverAcks();
679            }
680        }
681    
682        public synchronized void dispose() throws JMSException {
683            if (!closed) {
684    
685                try {
686                    executor.stop();
687    
688                    for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
689                        ActiveMQMessageConsumer consumer = iter.next();
690                        consumer.setFailureError(connection.getFirstFailureError());
691                        consumer.dispose();
692                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
693                    }
694                    consumers.clear();
695    
696                    for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
697                        ActiveMQMessageProducer producer = iter.next();
698                        producer.dispose();
699                    }
700                    producers.clear();
701    
702                    try {
703                        if (getTransactionContext().isInLocalTransaction()) {
704                            rollback();
705                        }
706                    } catch (JMSException e) {
707                    }
708    
709                } finally {
710                    connection.removeSession(this);
711                    this.transactionContext = null;
712                    closed = true;
713                }
714            }
715        }
716    
717        /**
718         * Checks that the session is not closed then configures the message
719         */
720        protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
721            checkClosed();
722            message.setConnection(connection);
723        }
724    
725        /**
726         * Check if the session is closed. It is used for ensuring that the session
727         * is open before performing various operations.
728         *
729         * @throws IllegalStateException if the Session is closed
730         */
731        protected void checkClosed() throws IllegalStateException {
732            if (closed) {
733                throw new IllegalStateException("The Session is closed");
734            }
735        }
736    
737        /**
738         * Checks if the session is closed.
739         *
740         * @return true if the session is closed, false otherwise.
741         */
742        public boolean isClosed() {
743            return closed;
744        }
745    
746        /**
747         * Stops message delivery in this session, and restarts message delivery
748         * with the oldest unacknowledged message.
749         * <P>
750         * All consumers deliver messages in a serial order. Acknowledging a
751         * received message automatically acknowledges all messages that have been
752         * delivered to the client.
753         * <P>
754         * Restarting a session causes it to take the following actions:
755         * <UL>
756         * <LI>Stop message delivery
757         * <LI>Mark all messages that might have been delivered but not
758         * acknowledged as "redelivered"
759         * <LI>Restart the delivery sequence including all unacknowledged messages
760         * that had been previously delivered. Redelivered messages do not have to
761         * be delivered in exactly their original delivery order.
762         * </UL>
763         *
764         * @throws JMSException if the JMS provider fails to stop and restart
765         *                 message delivery due to some internal error.
766         * @throws IllegalStateException if the method is called by a transacted
767         *                 session.
768         */
769        public void recover() throws JMSException {
770    
771            checkClosed();
772            if (getTransacted()) {
773                throw new IllegalStateException("This session is transacted");
774            }
775    
776            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
777                ActiveMQMessageConsumer c = iter.next();
778                c.rollback();
779            }
780    
781        }
782    
783        /**
784         * Returns the session's distinguished message listener (optional).
785         *
786         * @return the message listener associated with this session
787         * @throws JMSException if the JMS provider fails to get the message
788         *                 listener due to an internal error.
789         * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
790         * @see javax.jms.ServerSessionPool
791         * @see javax.jms.ServerSession
792         */
793        public MessageListener getMessageListener() throws JMSException {
794            checkClosed();
795            return this.messageListener;
796        }
797    
798        /**
799         * Sets the session's distinguished message listener (optional).
800         * <P>
801         * When the distinguished message listener is set, no other form of message
802         * receipt in the session can be used; however, all forms of sending
803         * messages are still supported.
804         * <P>
805         * If this session has been closed, then an {@link IllegalStateException} is
806         * thrown, if trying to set a new listener. However setting the listener
807         * to <tt>null</tt> is allowed, to clear the listener, even if this session
808         * has been closed prior.
809         * <P>
810         * This is an expert facility not used by regular JMS clients.
811         *
812         * @param listener the message listener to associate with this session
813         * @throws JMSException if the JMS provider fails to set the message
814         *                 listener due to an internal error.
815         * @see javax.jms.Session#getMessageListener()
816         * @see javax.jms.ServerSessionPool
817         * @see javax.jms.ServerSession
818         */
819        public void setMessageListener(MessageListener listener) throws JMSException {
820            // only check for closed if we set a new listener, as we allow to clear
821            // the listener, such as when an application is shutting down, and is
822            // no longer using a message listener on this session
823            if (listener != null) {
824                checkClosed();
825            }
826            this.messageListener = listener;
827    
828            if (listener != null) {
829                executor.setDispatchedBySessionPool(true);
830            }
831        }
832    
833        /**
834         * Optional operation, intended to be used only by Application Servers, not
835         * by ordinary JMS clients.
836         *
837         * @see javax.jms.ServerSession
838         */
839        public void run() {
840            MessageDispatch messageDispatch;
841            while ((messageDispatch = executor.dequeueNoWait()) != null) {
842                final MessageDispatch md = messageDispatch;
843                ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
844                if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
845                    // TODO: Ack it without delivery to client
846                    continue;
847                }
848    
849                if (isClientAcknowledge()||isIndividualAcknowledge()) {
850                    message.setAcknowledgeCallback(new Callback() {
851                        public void execute() throws Exception {
852                        }
853                    });
854                }
855    
856                if (deliveryListener != null) {
857                    deliveryListener.beforeDelivery(this, message);
858                }
859    
860                md.setDeliverySequenceId(getNextDeliveryId());
861    
862                final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
863                try {
864                    ack.setFirstMessageId(md.getMessage().getMessageId());
865                    doStartTransaction();
866                    ack.setTransactionId(getTransactionContext().getTransactionId());
867                    if (ack.getTransactionId() != null) {
868                        getTransactionContext().addSynchronization(new Synchronization() {
869    
870                            @Override
871                            public void beforeEnd() throws Exception {
872                                asyncSendPacket(ack);
873                            }
874    
875                            @Override
876                            public void afterRollback() throws Exception {
877                                md.getMessage().onMessageRolledBack();
878                                // ensure we don't filter this as a duplicate
879                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
880                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
881                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
882                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
883                                    && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
884                                    // We need to NACK the messages so that they get
885                                    // sent to the
886                                    // DLQ.
887                                    // Acknowledge the last message.
888                                    MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
889                                    ack.setFirstMessageId(md.getMessage().getMessageId());
890                                    ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
891                                    asyncSendPacket(ack);
892    
893                                } else {
894    
895                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
896                                    ack.setFirstMessageId(md.getMessage().getMessageId());
897                                    asyncSendPacket(ack);
898    
899                                    // Figure out how long we should wait to resend
900                                    // this message.
901                                    long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
902                                    for (int i = 0; i < redeliveryCounter; i++) {
903                                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
904                                    }
905                                    connection.getScheduler().executeAfterDelay(new Runnable() {
906    
907                                        public void run() {
908                                            ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
909                                        }
910                                    }, redeliveryDelay);
911                                }
912                            }
913                        });
914                    }
915    
916                    messageListener.onMessage(message);
917    
918                } catch (Throwable e) {
919                    LOG.error("error dispatching message: ", e);
920                    // A problem while invoking the MessageListener does not
921                    // in general indicate a problem with the connection to the broker, i.e.
922                    // it will usually be sufficient to let the afterDelivery() method either
923                    // commit or roll back in order to deal with the exception.
924                    // However, we notify any registered client internal exception listener
925                    // of the problem.
926                    connection.onClientInternalException(e);
927                } finally {
928                    if (ack.getTransactionId() == null) {
929                        try {
930                            asyncSendPacket(ack);
931                        } catch (Throwable e) {
932                            connection.onClientInternalException(e);
933                        }
934                    }
935                }
936    
937                if (deliveryListener != null) {
938                    deliveryListener.afterDelivery(this, message);
939                }
940            }
941        }
942    
943        /**
944         * Creates a <CODE>MessageProducer</CODE> to send messages to the
945         * specified destination.
946         * <P>
947         * A client uses a <CODE>MessageProducer</CODE> object to send messages to
948         * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
949         * inherit from <CODE>Destination</CODE>, they can be used in the
950         * destination parameter to create a <CODE>MessageProducer</CODE> object.
951         *
952         * @param destination the <CODE>Destination</CODE> to send to, or null if
953         *                this is a producer which does not have a specified
954         *                destination.
955         * @return the MessageProducer
956         * @throws JMSException if the session fails to create a MessageProducer due
957         *                 to some internal error.
958         * @throws InvalidDestinationException if an invalid destination is
959         *                 specified.
960         * @since 1.1
961         */
962        public MessageProducer createProducer(Destination destination) throws JMSException {
963            checkClosed();
964            if (destination instanceof CustomDestination) {
965                CustomDestination customDestination = (CustomDestination)destination;
966                return customDestination.createProducer(this);
967            }
968            int timeSendOut = connection.getSendTimeout();
969            return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
970        }
971    
972        /**
973         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
974         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
975         * <CODE>Destination</CODE>, they can be used in the destination
976         * parameter to create a <CODE>MessageConsumer</CODE>.
977         *
978         * @param destination the <CODE>Destination</CODE> to access.
979         * @return the MessageConsumer
980         * @throws JMSException if the session fails to create a consumer due to
981         *                 some internal error.
982         * @throws InvalidDestinationException if an invalid destination is
983         *                 specified.
984         * @since 1.1
985         */
986        public MessageConsumer createConsumer(Destination destination) throws JMSException {
987            return createConsumer(destination, (String) null);
988        }
989    
990        /**
991         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
992         * using a message selector. Since <CODE> Queue</CODE> and
993         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
994         * can be used in the destination parameter to create a
995         * <CODE>MessageConsumer</CODE>.
996         * <P>
997         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
998         * that have been sent to a destination.
999         *
1000         * @param destination the <CODE>Destination</CODE> to access
1001         * @param messageSelector only messages with properties matching the message
1002         *                selector expression are delivered. A value of null or an
1003         *                empty string indicates that there is no message selector
1004         *                for the message consumer.
1005         * @return the MessageConsumer
1006         * @throws JMSException if the session fails to create a MessageConsumer due
1007         *                 to some internal error.
1008         * @throws InvalidDestinationException if an invalid destination is
1009         *                 specified.
1010         * @throws InvalidSelectorException if the message selector is invalid.
1011         * @since 1.1
1012         */
1013        public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
1014            return createConsumer(destination, messageSelector, false);
1015        }
1016    
1017        /**
1018         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1019         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1020         * <CODE>Destination</CODE>, they can be used in the destination
1021         * parameter to create a <CODE>MessageConsumer</CODE>.
1022         *
1023         * @param destination the <CODE>Destination</CODE> to access.
1024         * @param messageListener the listener to use for async consumption of messages
1025         * @return the MessageConsumer
1026         * @throws JMSException if the session fails to create a consumer due to
1027         *                 some internal error.
1028         * @throws InvalidDestinationException if an invalid destination is
1029         *                 specified.
1030         * @since 1.1
1031         */
1032        public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1033            return createConsumer(destination, null, messageListener);
1034        }
1035    
1036        /**
1037         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1038         * using a message selector. Since <CODE> Queue</CODE> and
1039         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1040         * can be used in the destination parameter to create a
1041         * <CODE>MessageConsumer</CODE>.
1042         * <P>
1043         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1044         * that have been sent to a destination.
1045         *
1046         * @param destination the <CODE>Destination</CODE> to access
1047         * @param messageSelector only messages with properties matching the message
1048         *                selector expression are delivered. A value of null or an
1049         *                empty string indicates that there is no message selector
1050         *                for the message consumer.
1051         * @param messageListener the listener to use for async consumption of messages
1052         * @return the MessageConsumer
1053         * @throws JMSException if the session fails to create a MessageConsumer due
1054         *                 to some internal error.
1055         * @throws InvalidDestinationException if an invalid destination is
1056         *                 specified.
1057         * @throws InvalidSelectorException if the message selector is invalid.
1058         * @since 1.1
1059         */
1060        public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1061            return createConsumer(destination, messageSelector, false, messageListener);
1062        }
1063    
1064        /**
1065         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1066         * using a message selector. This method can specify whether messages
1067         * published by its own connection should be delivered to it, if the
1068         * destination is a topic.
1069         * <P>
1070         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1071         * <CODE>Destination</CODE>, they can be used in the destination
1072         * parameter to create a <CODE>MessageConsumer</CODE>.
1073         * <P>
1074         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1075         * that have been published to a destination.
1076         * <P>
1077         * In some cases, a connection may both publish and subscribe to a topic.
1078         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1079         * inhibit the delivery of messages published by its own connection. The
1080         * default value for this attribute is False. The <CODE>noLocal</CODE>
1081         * value must be supported by destinations that are topics.
1082         *
1083         * @param destination the <CODE>Destination</CODE> to access
1084         * @param messageSelector only messages with properties matching the message
1085         *                selector expression are delivered. A value of null or an
1086         *                empty string indicates that there is no message selector
1087         *                for the message consumer.
1088         * @param noLocal - if true, and the destination is a topic, inhibits the
1089         *                delivery of messages published by its own connection. The
1090         *                behavior for <CODE>NoLocal</CODE> is not specified if
1091         *                the destination is a queue.
1092         * @return the MessageConsumer
1093         * @throws JMSException if the session fails to create a MessageConsumer due
1094         *                 to some internal error.
1095         * @throws InvalidDestinationException if an invalid destination is
1096         *                 specified.
1097         * @throws InvalidSelectorException if the message selector is invalid.
1098         * @since 1.1
1099         */
1100        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1101            return createConsumer(destination, messageSelector, noLocal, null);
1102        }
1103    
1104        /**
1105         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1106         * using a message selector. This method can specify whether messages
1107         * published by its own connection should be delivered to it, if the
1108         * destination is a topic.
1109         * <P>
1110         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1111         * <CODE>Destination</CODE>, they can be used in the destination
1112         * parameter to create a <CODE>MessageConsumer</CODE>.
1113         * <P>
1114         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1115         * that have been published to a destination.
1116         * <P>
1117         * In some cases, a connection may both publish and subscribe to a topic.
1118         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1119         * inhibit the delivery of messages published by its own connection. The
1120         * default value for this attribute is False. The <CODE>noLocal</CODE>
1121         * value must be supported by destinations that are topics.
1122         *
1123         * @param destination the <CODE>Destination</CODE> to access
1124         * @param messageSelector only messages with properties matching the message
1125         *                selector expression are delivered. A value of null or an
1126         *                empty string indicates that there is no message selector
1127         *                for the message consumer.
1128         * @param noLocal - if true, and the destination is a topic, inhibits the
1129         *                delivery of messages published by its own connection. The
1130         *                behavior for <CODE>NoLocal</CODE> is not specified if
1131         *                the destination is a queue.
1132         * @param messageListener the listener to use for async consumption of messages
1133         * @return the MessageConsumer
1134         * @throws JMSException if the session fails to create a MessageConsumer due
1135         *                 to some internal error.
1136         * @throws InvalidDestinationException if an invalid destination is
1137         *                 specified.
1138         * @throws InvalidSelectorException if the message selector is invalid.
1139         * @since 1.1
1140         */
1141        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1142            checkClosed();
1143    
1144            if (destination instanceof CustomDestination) {
1145                CustomDestination customDestination = (CustomDestination)destination;
1146                return customDestination.createConsumer(this, messageSelector, noLocal);
1147            }
1148    
1149            ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1150            int prefetch = 0;
1151            if (destination instanceof Topic) {
1152                prefetch = prefetchPolicy.getTopicPrefetch();
1153            } else {
1154                prefetch = prefetchPolicy.getQueuePrefetch();
1155            }
1156            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1157            return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1158                    prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1159        }
1160    
1161        /**
1162         * Creates a queue identity given a <CODE>Queue</CODE> name.
1163         * <P>
1164         * This facility is provided for the rare cases where clients need to
1165         * dynamically manipulate queue identity. It allows the creation of a queue
1166         * identity with a provider-specific name. Clients that depend on this
1167         * ability are not portable.
1168         * <P>
1169         * Note that this method is not for creating the physical queue. The
1170         * physical creation of queues is an administrative task and is not to be
1171         * initiated by the JMS API. The one exception is the creation of temporary
1172         * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1173         * method.
1174         *
1175         * @param queueName the name of this <CODE>Queue</CODE>
1176         * @return a <CODE>Queue</CODE> with the given name
1177         * @throws JMSException if the session fails to create a queue due to some
1178         *                 internal error.
1179         * @since 1.1
1180         */
1181        public Queue createQueue(String queueName) throws JMSException {
1182            checkClosed();
1183            if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1184                return new ActiveMQTempQueue(queueName);
1185            }
1186            return new ActiveMQQueue(queueName);
1187        }
1188    
1189        /**
1190         * Creates a topic identity given a <CODE>Topic</CODE> name.
1191         * <P>
1192         * This facility is provided for the rare cases where clients need to
1193         * dynamically manipulate topic identity. This allows the creation of a
1194         * topic identity with a provider-specific name. Clients that depend on this
1195         * ability are not portable.
1196         * <P>
1197         * Note that this method is not for creating the physical topic. The
1198         * physical creation of topics is an administrative task and is not to be
1199         * initiated by the JMS API. The one exception is the creation of temporary
1200         * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1201         * method.
1202         *
1203         * @param topicName the name of this <CODE>Topic</CODE>
1204         * @return a <CODE>Topic</CODE> with the given name
1205         * @throws JMSException if the session fails to create a topic due to some
1206         *                 internal error.
1207         * @since 1.1
1208         */
1209        public Topic createTopic(String topicName) throws JMSException {
1210            checkClosed();
1211            if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1212                return new ActiveMQTempTopic(topicName);
1213            }
1214            return new ActiveMQTopic(topicName);
1215        }
1216    
1217        /**
1218         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1219         * the specified queue.
1220         *
1221         * @param queue the <CODE>queue</CODE> to access
1222         * @exception InvalidDestinationException if an invalid destination is
1223         *                    specified
1224         * @since 1.1
1225         */
1226        /**
1227         * Creates a durable subscriber to the specified topic.
1228         * <P>
1229         * If a client needs to receive all the messages published on a topic,
1230         * including the ones published while the subscriber is inactive, it uses a
1231         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1232         * record of this durable subscription and insures that all messages from
1233         * the topic's publishers are retained until they are acknowledged by this
1234         * durable subscriber or they have expired.
1235         * <P>
1236         * Sessions with durable subscribers must always provide the same client
1237         * identifier. In addition, each client must specify a name that uniquely
1238         * identifies (within client identifier) each durable subscription it
1239         * creates. Only one session at a time can have a
1240         * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1241         * <P>
1242         * A client can change an existing durable subscription by creating a
1243         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1244         * and/or message selector. Changing a durable subscriber is equivalent to
1245         * unsubscribing (deleting) the old one and creating a new one.
1246         * <P>
1247         * In some cases, a connection may both publish and subscribe to a topic.
1248         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1249         * inhibit the delivery of messages published by its own connection. The
1250         * default value for this attribute is false.
1251         *
1252         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1253         * @param name the name used to identify this subscription
1254         * @return the TopicSubscriber
1255         * @throws JMSException if the session fails to create a subscriber due to
1256         *                 some internal error.
1257         * @throws InvalidDestinationException if an invalid topic is specified.
1258         * @since 1.1
1259         */
1260        public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1261            checkClosed();
1262            return createDurableSubscriber(topic, name, null, false);
1263        }
1264    
1265        /**
1266         * Creates a durable subscriber to the specified topic, using a message
1267         * selector and specifying whether messages published by its own connection
1268         * should be delivered to it.
1269         * <P>
1270         * If a client needs to receive all the messages published on a topic,
1271         * including the ones published while the subscriber is inactive, it uses a
1272         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1273         * record of this durable subscription and insures that all messages from
1274         * the topic's publishers are retained until they are acknowledged by this
1275         * durable subscriber or they have expired.
1276         * <P>
1277         * Sessions with durable subscribers must always provide the same client
1278         * identifier. In addition, each client must specify a name which uniquely
1279         * identifies (within client identifier) each durable subscription it
1280         * creates. Only one session at a time can have a
1281         * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1282         * inactive durable subscriber is one that exists but does not currently
1283         * have a message consumer associated with it.
1284         * <P>
1285         * A client can change an existing durable subscription by creating a
1286         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1287         * and/or message selector. Changing a durable subscriber is equivalent to
1288         * unsubscribing (deleting) the old one and creating a new one.
1289         *
1290         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1291         * @param name the name used to identify this subscription
1292         * @param messageSelector only messages with properties matching the message
1293         *                selector expression are delivered. A value of null or an
1294         *                empty string indicates that there is no message selector
1295         *                for the message consumer.
1296         * @param noLocal if set, inhibits the delivery of messages published by its
1297         *                own connection
1298         * @return the Queue Browser
1299         * @throws JMSException if the session fails to create a subscriber due to
1300         *                 some internal error.
1301         * @throws InvalidDestinationException if an invalid topic is specified.
1302         * @throws InvalidSelectorException if the message selector is invalid.
1303         * @since 1.1
1304         */
1305        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1306            checkClosed();
1307    
1308            if (topic == null) {
1309                throw new InvalidDestinationException("Topic cannot be null");
1310            }
1311    
1312            if (isIndividualAcknowledge()) {
1313                throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+
1314                                                 "INDIVIDUAL_ACKNOWLEDGE mode.", null);
1315            }
1316    
1317            if (topic instanceof CustomDestination) {
1318                CustomDestination customDestination = (CustomDestination)topic;
1319                return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1320            }
1321    
1322            connection.checkClientIDWasManuallySpecified();
1323            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1324            int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1325            int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1326            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1327                                               noLocal, false, asyncDispatch);
1328        }
1329    
1330        /**
1331         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1332         * the specified queue.
1333         *
1334         * @param queue the <CODE>queue</CODE> to access
1335         * @return the Queue Browser
1336         * @throws JMSException if the session fails to create a browser due to some
1337         *                 internal error.
1338         * @throws InvalidDestinationException if an invalid destination is
1339         *                 specified
1340         * @since 1.1
1341         */
1342        public QueueBrowser createBrowser(Queue queue) throws JMSException {
1343            checkClosed();
1344            return createBrowser(queue, null);
1345        }
1346    
1347        /**
1348         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1349         * the specified queue using a message selector.
1350         *
1351         * @param queue the <CODE>queue</CODE> to access
1352         * @param messageSelector only messages with properties matching the message
1353         *                selector expression are delivered. A value of null or an
1354         *                empty string indicates that there is no message selector
1355         *                for the message consumer.
1356         * @return the Queue Browser
1357         * @throws JMSException if the session fails to create a browser due to some
1358         *                 internal error.
1359         * @throws InvalidDestinationException if an invalid destination is
1360         *                 specified
1361         * @throws InvalidSelectorException if the message selector is invalid.
1362         * @since 1.1
1363         */
1364        public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1365            checkClosed();
1366            return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1367        }
1368    
1369        /**
1370         * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1371         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1372         *
1373         * @return a temporary queue identity
1374         * @throws JMSException if the session fails to create a temporary queue due
1375         *                 to some internal error.
1376         * @since 1.1
1377         */
1378        public TemporaryQueue createTemporaryQueue() throws JMSException {
1379            checkClosed();
1380            return (TemporaryQueue)connection.createTempDestination(false);
1381        }
1382    
1383        /**
1384         * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1385         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1386         *
1387         * @return a temporary topic identity
1388         * @throws JMSException if the session fails to create a temporary topic due
1389         *                 to some internal error.
1390         * @since 1.1
1391         */
1392        public TemporaryTopic createTemporaryTopic() throws JMSException {
1393            checkClosed();
1394            return (TemporaryTopic)connection.createTempDestination(true);
1395        }
1396    
1397        /**
1398         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1399         * the specified queue.
1400         *
1401         * @param queue the <CODE>Queue</CODE> to access
1402         * @return
1403         * @throws JMSException if the session fails to create a receiver due to
1404         *                 some internal error.
1405         * @throws JMSException
1406         * @throws InvalidDestinationException if an invalid queue is specified.
1407         */
1408        public QueueReceiver createReceiver(Queue queue) throws JMSException {
1409            checkClosed();
1410            return createReceiver(queue, null);
1411        }
1412    
1413        /**
1414         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1415         * the specified queue using a message selector.
1416         *
1417         * @param queue the <CODE>Queue</CODE> to access
1418         * @param messageSelector only messages with properties matching the message
1419         *                selector expression are delivered. A value of null or an
1420         *                empty string indicates that there is no message selector
1421         *                for the message consumer.
1422         * @return QueueReceiver
1423         * @throws JMSException if the session fails to create a receiver due to
1424         *                 some internal error.
1425         * @throws InvalidDestinationException if an invalid queue is specified.
1426         * @throws InvalidSelectorException if the message selector is invalid.
1427         */
1428        public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1429            checkClosed();
1430    
1431            if (queue instanceof CustomDestination) {
1432                CustomDestination customDestination = (CustomDestination)queue;
1433                return customDestination.createReceiver(this, messageSelector);
1434            }
1435    
1436            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1437            return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1438                                             prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1439        }
1440    
1441        /**
1442         * Creates a <CODE>QueueSender</CODE> object to send messages to the
1443         * specified queue.
1444         *
1445         * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1446         *                unidentified producer
1447         * @return QueueSender
1448         * @throws JMSException if the session fails to create a sender due to some
1449         *                 internal error.
1450         * @throws InvalidDestinationException if an invalid queue is specified.
1451         */
1452        public QueueSender createSender(Queue queue) throws JMSException {
1453            checkClosed();
1454            if (queue instanceof CustomDestination) {
1455                CustomDestination customDestination = (CustomDestination)queue;
1456                return customDestination.createSender(this);
1457            }
1458            int timeSendOut = connection.getSendTimeout();
1459            return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1460        }
1461    
1462        /**
1463         * Creates a nondurable subscriber to the specified topic. <p/>
1464         * <P>
1465         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1466         * that have been published to a topic. <p/>
1467         * <P>
1468         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1469         * receive only messages that are published while they are active. <p/>
1470         * <P>
1471         * In some cases, a connection may both publish and subscribe to a topic.
1472         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1473         * inhibit the delivery of messages published by its own connection. The
1474         * default value for this attribute is false.
1475         *
1476         * @param topic the <CODE>Topic</CODE> to subscribe to
1477         * @return TopicSubscriber
1478         * @throws JMSException if the session fails to create a subscriber due to
1479         *                 some internal error.
1480         * @throws InvalidDestinationException if an invalid topic is specified.
1481         */
1482        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1483            checkClosed();
1484            return createSubscriber(topic, null, false);
1485        }
1486    
1487        /**
1488         * Creates a nondurable subscriber to the specified topic, using a message
1489         * selector or specifying whether messages published by its own connection
1490         * should be delivered to it. <p/>
1491         * <P>
1492         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1493         * that have been published to a topic. <p/>
1494         * <P>
1495         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1496         * receive only messages that are published while they are active. <p/>
1497         * <P>
1498         * Messages filtered out by a subscriber's message selector will never be
1499         * delivered to the subscriber. From the subscriber's perspective, they do
1500         * not exist. <p/>
1501         * <P>
1502         * In some cases, a connection may both publish and subscribe to a topic.
1503         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1504         * inhibit the delivery of messages published by its own connection. The
1505         * default value for this attribute is false.
1506         *
1507         * @param topic the <CODE>Topic</CODE> to subscribe to
1508         * @param messageSelector only messages with properties matching the message
1509         *                selector expression are delivered. A value of null or an
1510         *                empty string indicates that there is no message selector
1511         *                for the message consumer.
1512         * @param noLocal if set, inhibits the delivery of messages published by its
1513         *                own connection
1514         * @return TopicSubscriber
1515         * @throws JMSException if the session fails to create a subscriber due to
1516         *                 some internal error.
1517         * @throws InvalidDestinationException if an invalid topic is specified.
1518         * @throws InvalidSelectorException if the message selector is invalid.
1519         */
1520        public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1521            checkClosed();
1522    
1523            if (topic instanceof CustomDestination) {
1524                CustomDestination customDestination = (CustomDestination)topic;
1525                return customDestination.createSubscriber(this, messageSelector, noLocal);
1526            }
1527    
1528            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1529            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1530                .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1531        }
1532    
1533        /**
1534         * Creates a publisher for the specified topic. <p/>
1535         * <P>
1536         * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1537         * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1538         * a topic, it defines a new sequence of messages that have no ordering
1539         * relationship with the messages it has previously sent.
1540         *
1541         * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1542         *                an unidentified producer
1543         * @return TopicPublisher
1544         * @throws JMSException if the session fails to create a publisher due to
1545         *                 some internal error.
1546         * @throws InvalidDestinationException if an invalid topic is specified.
1547         */
1548        public TopicPublisher createPublisher(Topic topic) throws JMSException {
1549            checkClosed();
1550    
1551            if (topic instanceof CustomDestination) {
1552                CustomDestination customDestination = (CustomDestination)topic;
1553                return customDestination.createPublisher(this);
1554            }
1555            int timeSendOut = connection.getSendTimeout();
1556            return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1557        }
1558    
1559        /**
1560         * Unsubscribes a durable subscription that has been created by a client.
1561         * <P>
1562         * This method deletes the state being maintained on behalf of the
1563         * subscriber by its provider.
1564         * <P>
1565         * It is erroneous for a client to delete a durable subscription while there
1566         * is an active <CODE>MessageConsumer </CODE> or
1567         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1568         * message is part of a pending transaction or has not been acknowledged in
1569         * the session.
1570         *
1571         * @param name the name used to identify this subscription
1572         * @throws JMSException if the session fails to unsubscribe to the durable
1573         *                 subscription due to some internal error.
1574         * @throws InvalidDestinationException if an invalid subscription name is
1575         *                 specified.
1576         * @since 1.1
1577         */
1578        public void unsubscribe(String name) throws JMSException {
1579            checkClosed();
1580            connection.unsubscribe(name);
1581        }
1582    
1583        public void dispatch(MessageDispatch messageDispatch) {
1584            try {
1585                executor.execute(messageDispatch);
1586            } catch (InterruptedException e) {
1587                Thread.currentThread().interrupt();
1588                connection.onClientInternalException(e);
1589            }
1590        }
1591    
1592        /**
1593         * Acknowledges all consumed messages of the session of this consumed
1594         * message.
1595         * <P>
1596         * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1597         * for use when a client has specified that its JMS session's consumed
1598         * messages are to be explicitly acknowledged. By invoking
1599         * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1600         * all messages consumed by the session that the message was delivered to.
1601         * <P>
1602         * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1603         * sessions and sessions specified to use implicit acknowledgement modes.
1604         * <P>
1605         * A client may individually acknowledge each message as it is consumed, or
1606         * it may choose to acknowledge messages as an application-defined group
1607         * (which is done by calling acknowledge on the last received message of the
1608         * group, thereby acknowledging all messages consumed by the session.)
1609         * <P>
1610         * Messages that have been received but not acknowledged may be redelivered.
1611         *
1612         * @throws JMSException if the JMS provider fails to acknowledge the
1613         *                 messages due to some internal error.
1614         * @throws javax.jms.IllegalStateException if this method is called on a
1615         *                 closed session.
1616         * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1617         */
1618        public void acknowledge() throws JMSException {
1619            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1620                ActiveMQMessageConsumer c = iter.next();
1621                c.acknowledge();
1622            }
1623        }
1624    
1625        /**
1626         * Add a message consumer.
1627         *
1628         * @param consumer - message consumer.
1629         * @throws JMSException
1630         */
1631        protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1632            this.consumers.add(consumer);
1633            if (consumer.isDurableSubscriber()) {
1634                stats.onCreateDurableSubscriber();
1635            }
1636            this.connection.addDispatcher(consumer.getConsumerId(), this);
1637        }
1638    
1639        /**
1640         * Remove the message consumer.
1641         *
1642         * @param consumer - consumer to be removed.
1643         * @throws JMSException
1644         */
1645        protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1646            this.connection.removeDispatcher(consumer.getConsumerId());
1647            if (consumer.isDurableSubscriber()) {
1648                stats.onRemoveDurableSubscriber();
1649            }
1650            this.consumers.remove(consumer);
1651            this.connection.removeDispatcher(consumer);
1652        }
1653    
1654        /**
1655         * Adds a message producer.
1656         *
1657         * @param producer - message producer to be added.
1658         * @throws JMSException
1659         */
1660        protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1661            this.producers.add(producer);
1662            this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1663        }
1664    
1665        /**
1666         * Removes a message producer.
1667         *
1668         * @param producer - message producer to be removed.
1669         * @throws JMSException
1670         */
1671        protected void removeProducer(ActiveMQMessageProducer producer) {
1672            this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1673            this.producers.remove(producer);
1674        }
1675    
1676        /**
1677         * Start this Session.
1678         *
1679         * @throws JMSException
1680         */
1681        protected void start() throws JMSException {
1682            started.set(true);
1683            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1684                ActiveMQMessageConsumer c = iter.next();
1685                c.start();
1686            }
1687            executor.start();
1688        }
1689    
1690        /**
1691         * Stops this session.
1692         *
1693         * @throws JMSException
1694         */
1695        protected void stop() throws JMSException {
1696    
1697            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1698                ActiveMQMessageConsumer c = iter.next();
1699                c.stop();
1700            }
1701    
1702            started.set(false);
1703            executor.stop();
1704        }
1705    
1706        /**
1707         * Returns the session id.
1708         *
1709         * @return value - session id.
1710         */
1711        protected SessionId getSessionId() {
1712            return info.getSessionId();
1713        }
1714    
1715        /**
1716         * @return
1717         */
1718        protected ConsumerId getNextConsumerId() {
1719            return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1720        }
1721    
1722        /**
1723         * @return
1724         */
1725        protected ProducerId getNextProducerId() {
1726            return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1727        }
1728    
1729        /**
1730         * Sends the message for dispatch by the broker.
1731         *
1732         *
1733         * @param producer - message producer.
1734         * @param destination - message destination.
1735         * @param message - message to be sent.
1736         * @param deliveryMode - JMS messsage delivery mode.
1737         * @param priority - message priority.
1738         * @param timeToLive - message expiration.
1739         * @param producerWindow
1740         * @param onComplete
1741         * @throws JMSException
1742         */
1743        protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1744                            MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1745    
1746            checkClosed();
1747            if (destination.isTemporary() && connection.isDeleted(destination)) {
1748                throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1749            }
1750            synchronized (sendMutex) {
1751                // tell the Broker we are about to start a new transaction
1752                doStartTransaction();
1753                TransactionId txid = transactionContext.getTransactionId();
1754                long sequenceNumber = producer.getMessageSequence();
1755    
1756                //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1757                message.setJMSDeliveryMode(deliveryMode);
1758                long expiration = 0L;
1759                if (!producer.getDisableMessageTimestamp()) {
1760                    long timeStamp = System.currentTimeMillis();
1761                    message.setJMSTimestamp(timeStamp);
1762                    if (timeToLive > 0) {
1763                        expiration = timeToLive + timeStamp;
1764                    }
1765                }
1766                message.setJMSExpiration(expiration);
1767                message.setJMSPriority(priority);
1768                message.setJMSRedelivered(false);
1769    
1770                // transform to our own message format here
1771                ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1772                msg.setDestination(destination);
1773                msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1774    
1775                // Set the message id.
1776                if (msg != message) {
1777                    message.setJMSMessageID(msg.getMessageId().toString());
1778                    // Make sure the JMS destination is set on the foreign messages too.
1779                    message.setJMSDestination(destination);
1780                }
1781                //clear the brokerPath in case we are re-sending this message
1782                msg.setBrokerPath(null);
1783    
1784                msg.setTransactionId(txid);
1785                if (connection.isCopyMessageOnSend()) {
1786                    msg = (ActiveMQMessage)msg.copy();
1787                }
1788                msg.setConnection(connection);
1789                msg.onSend();
1790                msg.setProducerId(msg.getMessageId().getProducerId());
1791                if (LOG.isTraceEnabled()) {
1792                    LOG.trace(getSessionId() + " sending message: " + msg);
1793                }
1794                if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1795                    this.connection.asyncSendPacket(msg);
1796                    if (producerWindow != null) {
1797                        // Since we defer lots of the marshaling till we hit the
1798                        // wire, this might not
1799                        // provide and accurate size. We may change over to doing
1800                        // more aggressive marshaling,
1801                        // to get more accurate sizes.. this is more important once
1802                        // users start using producer window
1803                        // flow control.
1804                        int size = msg.getSize();
1805                        producerWindow.increaseUsage(size);
1806                    }
1807                } else {
1808                    if (sendTimeout > 0 && onComplete==null) {
1809                        this.connection.syncSendPacket(msg,sendTimeout);
1810                    }else {
1811                        this.connection.syncSendPacket(msg, onComplete);
1812                    }
1813                }
1814    
1815            }
1816        }
1817    
1818        /**
1819         * Send TransactionInfo to indicate transaction has started
1820         *
1821         * @throws JMSException if some internal error occurs
1822         */
1823        protected void doStartTransaction() throws JMSException {
1824            if (getTransacted() && !transactionContext.isInXATransaction()) {
1825                transactionContext.begin();
1826            }
1827        }
1828    
1829        /**
1830         * Checks whether the session has unconsumed messages.
1831         *
1832         * @return true - if there are unconsumed messages.
1833         */
1834        public boolean hasUncomsumedMessages() {
1835            return executor.hasUncomsumedMessages();
1836        }
1837    
1838        /**
1839         * Checks whether the session uses transactions.
1840         *
1841         * @return true - if the session uses transactions.
1842         */
1843        public boolean isTransacted() {
1844            return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1845        }
1846    
1847        /**
1848         * Checks whether the session used client acknowledgment.
1849         *
1850         * @return true - if the session uses client acknowledgment.
1851         */
1852        protected boolean isClientAcknowledge() {
1853            return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1854        }
1855    
1856        /**
1857         * Checks whether the session used auto acknowledgment.
1858         *
1859         * @return true - if the session uses client acknowledgment.
1860         */
1861        public boolean isAutoAcknowledge() {
1862            return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1863        }
1864    
1865        /**
1866         * Checks whether the session used dup ok acknowledgment.
1867         *
1868         * @return true - if the session uses client acknowledgment.
1869         */
1870        public boolean isDupsOkAcknowledge() {
1871            return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1872        }
1873    
1874        public boolean isIndividualAcknowledge(){
1875            return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1876        }
1877    
1878        /**
1879         * Returns the message delivery listener.
1880         *
1881         * @return deliveryListener - message delivery listener.
1882         */
1883        public DeliveryListener getDeliveryListener() {
1884            return deliveryListener;
1885        }
1886    
1887        /**
1888         * Sets the message delivery listener.
1889         *
1890         * @param deliveryListener - message delivery listener.
1891         */
1892        public void setDeliveryListener(DeliveryListener deliveryListener) {
1893            this.deliveryListener = deliveryListener;
1894        }
1895    
1896        /**
1897         * Returns the SessionInfo bean.
1898         *
1899         * @return info - SessionInfo bean.
1900         * @throws JMSException
1901         */
1902        protected SessionInfo getSessionInfo() throws JMSException {
1903            SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1904            return info;
1905        }
1906    
1907        /**
1908         * Send the asynchronus command.
1909         *
1910         * @param command - command to be executed.
1911         * @throws JMSException
1912         */
1913        public void asyncSendPacket(Command command) throws JMSException {
1914            connection.asyncSendPacket(command);
1915        }
1916    
1917        /**
1918         * Send the synchronus command.
1919         *
1920         * @param command - command to be executed.
1921         * @return Response
1922         * @throws JMSException
1923         */
1924        public Response syncSendPacket(Command command) throws JMSException {
1925            return connection.syncSendPacket(command);
1926        }
1927    
1928        public long getNextDeliveryId() {
1929            return deliveryIdGenerator.getNextSequenceId();
1930        }
1931    
1932        public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1933    
1934            List<MessageDispatch> c = unconsumedMessages.removeAll();
1935            for (MessageDispatch md : c) {
1936                this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1937            }
1938            Collections.reverse(c);
1939    
1940            for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1941                MessageDispatch md = iter.next();
1942                executor.executeFirst(md);
1943            }
1944    
1945        }
1946    
1947        public boolean isRunning() {
1948            return started.get();
1949        }
1950    
1951        public boolean isAsyncDispatch() {
1952            return asyncDispatch;
1953        }
1954    
1955        public void setAsyncDispatch(boolean asyncDispatch) {
1956            this.asyncDispatch = asyncDispatch;
1957        }
1958    
1959        /**
1960         * @return Returns the sessionAsyncDispatch.
1961         */
1962        public boolean isSessionAsyncDispatch() {
1963            return sessionAsyncDispatch;
1964        }
1965    
1966        /**
1967         * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1968         */
1969        public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1970            this.sessionAsyncDispatch = sessionAsyncDispatch;
1971        }
1972    
1973        public MessageTransformer getTransformer() {
1974            return transformer;
1975        }
1976    
1977        public ActiveMQConnection getConnection() {
1978            return connection;
1979        }
1980    
1981        /**
1982         * Sets the transformer used to transform messages before they are sent on
1983         * to the JMS bus or when they are received from the bus but before they are
1984         * delivered to the JMS client
1985         */
1986        public void setTransformer(MessageTransformer transformer) {
1987            this.transformer = transformer;
1988        }
1989    
1990        public BlobTransferPolicy getBlobTransferPolicy() {
1991            return blobTransferPolicy;
1992        }
1993    
1994        /**
1995         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1996         * OBjects) are transferred from producers to brokers to consumers
1997         */
1998        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1999            this.blobTransferPolicy = blobTransferPolicy;
2000        }
2001    
2002        public List<MessageDispatch> getUnconsumedMessages() {
2003            return executor.getUnconsumedMessages();
2004        }
2005    
2006        @Override
2007        public String toString() {
2008            return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
2009        }
2010    
2011        public void checkMessageListener() throws JMSException {
2012            if (messageListener != null) {
2013                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2014            }
2015            for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
2016                ActiveMQMessageConsumer consumer = i.next();
2017                if (consumer.hasMessageListener()) {
2018                    throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2019                }
2020            }
2021        }
2022    
2023        protected void setOptimizeAcknowledge(boolean value) {
2024            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2025                ActiveMQMessageConsumer c = iter.next();
2026                c.setOptimizeAcknowledge(value);
2027            }
2028        }
2029    
2030        protected void setPrefetchSize(ConsumerId id, int prefetch) {
2031            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2032                ActiveMQMessageConsumer c = iter.next();
2033                if (c.getConsumerId().equals(id)) {
2034                    c.setPrefetchSize(prefetch);
2035                    break;
2036                }
2037            }
2038        }
2039    
2040        protected void close(ConsumerId id) {
2041            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2042                ActiveMQMessageConsumer c = iter.next();
2043                if (c.getConsumerId().equals(id)) {
2044                    try {
2045                        c.close();
2046                    } catch (JMSException e) {
2047                        LOG.warn("Exception closing consumer", e);
2048                    }
2049                    LOG.warn("Closed consumer on Command, " + id);
2050                    break;
2051                }
2052            }
2053        }
2054    
2055        public boolean isInUse(ActiveMQTempDestination destination) {
2056            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2057                ActiveMQMessageConsumer c = iter.next();
2058                if (c.isInUse(destination)) {
2059                    return true;
2060                }
2061            }
2062            return false;
2063        }
2064    
2065        /**
2066         * highest sequence id of the last message delivered by this session.
2067         * Passed to the broker in the close command, maintained by dispose()
2068         * @return lastDeliveredSequenceId
2069         */
2070        public long getLastDeliveredSequenceId() {
2071            return lastDeliveredSequenceId;
2072        }
2073    
2074        protected void sendAck(MessageAck ack) throws JMSException {
2075            sendAck(ack,false);
2076        }
2077    
2078        protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2079            if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2080                asyncSendPacket(ack);
2081            } else {
2082                syncSendPacket(ack);
2083            }
2084        }
2085    
2086        protected Scheduler getScheduler() throws JMSException {
2087            return this.connection.getScheduler();
2088        }
2089    
2090        protected ThreadPoolExecutor getConnectionExecutor() {
2091            return this.connectionExecutor;
2092        }
2093    }