001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.LinkedBlockingQueue;
031    import java.util.concurrent.RejectedExecutionHandler;
032    import java.util.concurrent.ThreadFactory;
033    import java.util.concurrent.ThreadPoolExecutor;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.atomic.AtomicBoolean;
036    import java.util.concurrent.atomic.AtomicInteger;
037    
038    import javax.jms.Connection;
039    import javax.jms.ConnectionConsumer;
040    import javax.jms.ConnectionMetaData;
041    import javax.jms.DeliveryMode;
042    import javax.jms.Destination;
043    import javax.jms.ExceptionListener;
044    import javax.jms.IllegalStateException;
045    import javax.jms.InvalidDestinationException;
046    import javax.jms.JMSException;
047    import javax.jms.Queue;
048    import javax.jms.QueueConnection;
049    import javax.jms.QueueSession;
050    import javax.jms.ServerSessionPool;
051    import javax.jms.Session;
052    import javax.jms.Topic;
053    import javax.jms.TopicConnection;
054    import javax.jms.TopicSession;
055    import javax.jms.XAConnection;
056    
057    import org.apache.activemq.advisory.DestinationSource;
058    import org.apache.activemq.blob.BlobTransferPolicy;
059    import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
060    import org.apache.activemq.command.ActiveMQDestination;
061    import org.apache.activemq.command.ActiveMQMessage;
062    import org.apache.activemq.command.ActiveMQTempDestination;
063    import org.apache.activemq.command.ActiveMQTempQueue;
064    import org.apache.activemq.command.ActiveMQTempTopic;
065    import org.apache.activemq.command.BrokerInfo;
066    import org.apache.activemq.command.Command;
067    import org.apache.activemq.command.CommandTypes;
068    import org.apache.activemq.command.ConnectionControl;
069    import org.apache.activemq.command.ConnectionError;
070    import org.apache.activemq.command.ConnectionId;
071    import org.apache.activemq.command.ConnectionInfo;
072    import org.apache.activemq.command.ConsumerControl;
073    import org.apache.activemq.command.ConsumerId;
074    import org.apache.activemq.command.ConsumerInfo;
075    import org.apache.activemq.command.ControlCommand;
076    import org.apache.activemq.command.DestinationInfo;
077    import org.apache.activemq.command.ExceptionResponse;
078    import org.apache.activemq.command.Message;
079    import org.apache.activemq.command.MessageDispatch;
080    import org.apache.activemq.command.MessageId;
081    import org.apache.activemq.command.ProducerAck;
082    import org.apache.activemq.command.ProducerId;
083    import org.apache.activemq.command.RemoveInfo;
084    import org.apache.activemq.command.RemoveSubscriptionInfo;
085    import org.apache.activemq.command.Response;
086    import org.apache.activemq.command.SessionId;
087    import org.apache.activemq.command.ShutdownInfo;
088    import org.apache.activemq.command.WireFormatInfo;
089    import org.apache.activemq.management.JMSConnectionStatsImpl;
090    import org.apache.activemq.management.JMSStatsImpl;
091    import org.apache.activemq.management.StatsCapable;
092    import org.apache.activemq.management.StatsImpl;
093    import org.apache.activemq.state.CommandVisitorAdapter;
094    import org.apache.activemq.thread.Scheduler;
095    import org.apache.activemq.thread.TaskRunnerFactory;
096    import org.apache.activemq.transport.FutureResponse;
097    import org.apache.activemq.transport.ResponseCallback;
098    import org.apache.activemq.transport.Transport;
099    import org.apache.activemq.transport.TransportListener;
100    import org.apache.activemq.transport.failover.FailoverTransport;
101    import org.apache.activemq.util.IdGenerator;
102    import org.apache.activemq.util.IntrospectionSupport;
103    import org.apache.activemq.util.JMSExceptionSupport;
104    import org.apache.activemq.util.LongSequenceGenerator;
105    import org.apache.activemq.util.ServiceSupport;
106    import org.apache.activemq.util.ThreadPoolUtils;
107    import org.slf4j.Logger;
108    import org.slf4j.LoggerFactory;
109    
110    public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
111    
112        public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
113        public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
114        public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
115        public static int DEFAULT_THREAD_POOL_SIZE = 1000;
116    
117        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
118    
119        public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
120    
121        protected boolean dispatchAsync=true;
122        protected boolean alwaysSessionAsync = true;
123    
124        private TaskRunnerFactory sessionTaskRunner;
125        private final ThreadPoolExecutor executor;
126    
127        // Connection state variables
128        private final ConnectionInfo info;
129        private ExceptionListener exceptionListener;
130        private ClientInternalExceptionListener clientInternalExceptionListener;
131        private boolean clientIDSet;
132        private boolean isConnectionInfoSentToBroker;
133        private boolean userSpecifiedClientID;
134    
135        // Configuration options variables
136        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
137        private BlobTransferPolicy blobTransferPolicy;
138        private RedeliveryPolicyMap redeliveryPolicyMap;
139        private MessageTransformer transformer;
140    
141        private boolean disableTimeStampsByDefault;
142        private boolean optimizedMessageDispatch = true;
143        private boolean copyMessageOnSend = true;
144        private boolean useCompression;
145        private boolean objectMessageSerializationDefered;
146        private boolean useAsyncSend;
147        private boolean optimizeAcknowledge;
148        private long optimizeAcknowledgeTimeOut = 0;
149        private long optimizedAckScheduledAckInterval = 0;
150        private boolean nestedMapAndListEnabled = true;
151        private boolean useRetroactiveConsumer;
152        private boolean exclusiveConsumer;
153        private boolean alwaysSyncSend;
154        private int closeTimeout = 15000;
155        private boolean watchTopicAdvisories = true;
156        private long warnAboutUnstartedConnectionTimeout = 500L;
157        private int sendTimeout =0;
158        private boolean sendAcksAsync=true;
159        private boolean checkForDuplicates = true;
160        private boolean queueOnlyConnection = false;
161    
162        private final Transport transport;
163        private final IdGenerator clientIdGenerator;
164        private final JMSStatsImpl factoryStats;
165        private final JMSConnectionStatsImpl stats;
166    
167        private final AtomicBoolean started = new AtomicBoolean(false);
168        private final AtomicBoolean closing = new AtomicBoolean(false);
169        private final AtomicBoolean closed = new AtomicBoolean(false);
170        private final AtomicBoolean transportFailed = new AtomicBoolean(false);
171        private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
172        private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
173        private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
174        private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
175        private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
176    
177        // Maps ConsumerIds to ActiveMQConsumer objects
178        private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
179        private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
180        private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
181        private final SessionId connectionSessionId;
182        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
183        private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
184        private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
185        private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
186    
187        private AdvisoryConsumer advisoryConsumer;
188        private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
189        private BrokerInfo brokerInfo;
190        private IOException firstFailureError;
191        private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
192    
193        // Assume that protocol is the latest. Change to the actual protocol
194        // version when a WireFormatInfo is received.
195        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
196        private final long timeCreated;
197        private final ConnectionAudit connectionAudit = new ConnectionAudit();
198        private DestinationSource destinationSource;
199        private final Object ensureConnectionInfoSentMutex = new Object();
200        private boolean useDedicatedTaskRunner;
201        protected volatile CountDownLatch transportInterruptionProcessingComplete;
202        private long consumerFailoverRedeliveryWaitPeriod;
203        private Scheduler scheduler;
204        private boolean messagePrioritySupported = true;
205        private boolean transactedIndividualAck = false;
206        private boolean nonBlockingRedelivery = false;
207    
208        private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
209        private RejectedExecutionHandler rejectedTaskHandler = null;
210    
211        /**
212         * Construct an <code>ActiveMQConnection</code>
213         *
214         * @param transport
215         * @param factoryStats
216         * @throws Exception
217         */
218        protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
219    
220            this.transport = transport;
221            this.clientIdGenerator = clientIdGenerator;
222            this.factoryStats = factoryStats;
223    
224            // Configure a single threaded executor who's core thread can timeout if
225            // idle
226            executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
227                public Thread newThread(Runnable r) {
228                    Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
229                    //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
230                    //thread.setDaemon(true);
231                    return thread;
232                }
233            });
234            // asyncConnectionThread.allowCoreThreadTimeOut(true);
235            String uniqueId = connectionIdGenerator.generateId();
236            this.info = new ConnectionInfo(new ConnectionId(uniqueId));
237            this.info.setManageable(true);
238            this.info.setFaultTolerant(transport.isFaultTolerant());
239            this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
240    
241            this.transport.setTransportListener(this);
242    
243            this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
244            this.factoryStats.addConnection(this);
245            this.timeCreated = System.currentTimeMillis();
246            this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
247        }
248    
249        protected void setUserName(String userName) {
250            this.info.setUserName(userName);
251        }
252    
253        protected void setPassword(String password) {
254            this.info.setPassword(password);
255        }
256    
257        /**
258         * A static helper method to create a new connection
259         *
260         * @return an ActiveMQConnection
261         * @throws JMSException
262         */
263        public static ActiveMQConnection makeConnection() throws JMSException {
264            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
265            return (ActiveMQConnection)factory.createConnection();
266        }
267    
268        /**
269         * A static helper method to create a new connection
270         *
271         * @param uri
272         * @return and ActiveMQConnection
273         * @throws JMSException
274         */
275        public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
276            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
277            return (ActiveMQConnection)factory.createConnection();
278        }
279    
280        /**
281         * A static helper method to create a new connection
282         *
283         * @param user
284         * @param password
285         * @param uri
286         * @return an ActiveMQConnection
287         * @throws JMSException
288         */
289        public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
290            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
291            return (ActiveMQConnection)factory.createConnection();
292        }
293    
294        /**
295         * @return a number unique for this connection
296         */
297        public JMSConnectionStatsImpl getConnectionStats() {
298            return stats;
299        }
300    
301        /**
302         * Creates a <CODE>Session</CODE> object.
303         *
304         * @param transacted indicates whether the session is transacted
305         * @param acknowledgeMode indicates whether the consumer or the client will
306         *                acknowledge any messages it receives; ignored if the
307         *                session is transacted. Legal values are
308         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
309         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
310         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
311         * @return a newly created session
312         * @throws JMSException if the <CODE>Connection</CODE> object fails to
313         *                 create a session due to some internal error or lack of
314         *                 support for the specific transaction and acknowledgement
315         *                 mode.
316         * @see Session#AUTO_ACKNOWLEDGE
317         * @see Session#CLIENT_ACKNOWLEDGE
318         * @see Session#DUPS_OK_ACKNOWLEDGE
319         * @since 1.1
320         */
321        public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
322            checkClosedOrFailed();
323            ensureConnectionInfoSent();
324            if(!transacted) {
325                if (acknowledgeMode==Session.SESSION_TRANSACTED) {
326                    throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
327                } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
328                    throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
329                            "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
330                }
331            }
332            return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
333                ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
334        }
335    
336        /**
337         * @return sessionId
338         */
339        protected SessionId getNextSessionId() {
340            return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
341        }
342    
343        /**
344         * Gets the client identifier for this connection.
345         * <P>
346         * This value is specific to the JMS provider. It is either preconfigured by
347         * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
348         * dynamically by the application by calling the <code>setClientID</code>
349         * method.
350         *
351         * @return the unique client identifier
352         * @throws JMSException if the JMS provider fails to return the client ID
353         *                 for this connection due to some internal error.
354         */
355        public String getClientID() throws JMSException {
356            checkClosedOrFailed();
357            return this.info.getClientId();
358        }
359    
360        /**
361         * Sets the client identifier for this connection.
362         * <P>
363         * The preferred way to assign a JMS client's client identifier is for it to
364         * be configured in a client-specific <CODE>ConnectionFactory</CODE>
365         * object and transparently assigned to the <CODE>Connection</CODE> object
366         * it creates.
367         * <P>
368         * Alternatively, a client can set a connection's client identifier using a
369         * provider-specific value. The facility to set a connection's client
370         * identifier explicitly is not a mechanism for overriding the identifier
371         * that has been administratively configured. It is provided for the case
372         * where no administratively specified identifier exists. If one does exist,
373         * an attempt to change it by setting it must throw an
374         * <CODE>IllegalStateException</CODE>. If a client sets the client
375         * identifier explicitly, it must do so immediately after it creates the
376         * connection and before any other action on the connection is taken. After
377         * this point, setting the client identifier is a programming error that
378         * should throw an <CODE>IllegalStateException</CODE>.
379         * <P>
380         * The purpose of the client identifier is to associate a connection and its
381         * objects with a state maintained on behalf of the client by a provider.
382         * The only such state identified by the JMS API is that required to support
383         * durable subscriptions.
384         * <P>
385         * If another connection with the same <code>clientID</code> is already
386         * running when this method is called, the JMS provider should detect the
387         * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
388         *
389         * @param newClientID the unique client identifier
390         * @throws JMSException if the JMS provider fails to set the client ID for
391         *                 this connection due to some internal error.
392         * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
393         *                 invalid or duplicate client ID.
394         * @throws javax.jms.IllegalStateException if the JMS client attempts to set
395         *                 a connection's client ID at the wrong time or when it has
396         *                 been administratively configured.
397         */
398        public void setClientID(String newClientID) throws JMSException {
399            checkClosedOrFailed();
400    
401            if (this.clientIDSet) {
402                throw new IllegalStateException("The clientID has already been set");
403            }
404    
405            if (this.isConnectionInfoSentToBroker) {
406                throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
407            }
408    
409            this.info.setClientId(newClientID);
410            this.userSpecifiedClientID = true;
411            ensureConnectionInfoSent();
412        }
413    
414        /**
415         * Sets the default client id that the connection will use if explicitly not
416         * set with the setClientId() call.
417         */
418        public void setDefaultClientID(String clientID) throws JMSException {
419            this.info.setClientId(clientID);
420            this.userSpecifiedClientID = true;
421        }
422    
423        /**
424         * Gets the metadata for this connection.
425         *
426         * @return the connection metadata
427         * @throws JMSException if the JMS provider fails to get the connection
428         *                 metadata for this connection.
429         * @see javax.jms.ConnectionMetaData
430         */
431        public ConnectionMetaData getMetaData() throws JMSException {
432            checkClosedOrFailed();
433            return ActiveMQConnectionMetaData.INSTANCE;
434        }
435    
436        /**
437         * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
438         * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
439         * associated with it.
440         *
441         * @return the <CODE>ExceptionListener</CODE> for this connection, or
442         *         null, if no <CODE>ExceptionListener</CODE> is associated with
443         *         this connection.
444         * @throws JMSException if the JMS provider fails to get the
445         *                 <CODE>ExceptionListener</CODE> for this connection.
446         * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
447         */
448        public ExceptionListener getExceptionListener() throws JMSException {
449            checkClosedOrFailed();
450            return this.exceptionListener;
451        }
452    
453        /**
454         * Sets an exception listener for this connection.
455         * <P>
456         * If a JMS provider detects a serious problem with a connection, it informs
457         * the connection's <CODE> ExceptionListener</CODE>, if one has been
458         * registered. It does this by calling the listener's <CODE>onException
459         * </CODE>
460         * method, passing it a <CODE>JMSException</CODE> object describing the
461         * problem.
462         * <P>
463         * An exception listener allows a client to be notified of a problem
464         * asynchronously. Some connections only consume messages, so they would
465         * have no other way to learn their connection has failed.
466         * <P>
467         * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
468         * <P>
469         * A JMS provider should attempt to resolve connection problems itself
470         * before it notifies the client of them.
471         *
472         * @param listener the exception listener
473         * @throws JMSException if the JMS provider fails to set the exception
474         *                 listener for this connection.
475         */
476        public void setExceptionListener(ExceptionListener listener) throws JMSException {
477            checkClosedOrFailed();
478            this.exceptionListener = listener;
479        }
480    
481        /**
482         * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
483         * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
484         * associated with it.
485         *
486         * @return the listener or <code>null</code> if no listener is registered with the connection.
487         */
488        public ClientInternalExceptionListener getClientInternalExceptionListener() {
489            return clientInternalExceptionListener;
490        }
491    
492        /**
493         * Sets a client internal exception listener for this connection.
494         * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
495         * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
496         * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
497         * describing the problem.
498         *
499         * @param listener the exception listener
500         */
501        public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
502            this.clientInternalExceptionListener = listener;
503        }
504    
505        /**
506         * Starts (or restarts) a connection's delivery of incoming messages. A call
507         * to <CODE>start</CODE> on a connection that has already been started is
508         * ignored.
509         *
510         * @throws JMSException if the JMS provider fails to start message delivery
511         *                 due to some internal error.
512         * @see javax.jms.Connection#stop()
513         */
514        public void start() throws JMSException {
515            checkClosedOrFailed();
516            ensureConnectionInfoSent();
517            if (started.compareAndSet(false, true)) {
518                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
519                    ActiveMQSession session = i.next();
520                    session.start();
521                }
522            }
523        }
524    
525        /**
526         * Temporarily stops a connection's delivery of incoming messages. Delivery
527         * can be restarted using the connection's <CODE>start</CODE> method. When
528         * the connection is stopped, delivery to all the connection's message
529         * consumers is inhibited: synchronous receives block, and messages are not
530         * delivered to message listeners.
531         * <P>
532         * This call blocks until receives and/or message listeners in progress have
533         * completed.
534         * <P>
535         * Stopping a connection has no effect on its ability to send messages. A
536         * call to <CODE>stop</CODE> on a connection that has already been stopped
537         * is ignored.
538         * <P>
539         * A call to <CODE>stop</CODE> must not return until delivery of messages
540         * has paused. This means that a client can rely on the fact that none of
541         * its message listeners will be called and that all threads of control
542         * waiting for <CODE>receive</CODE> calls to return will not return with a
543         * message until the connection is restarted. The receive timers for a
544         * stopped connection continue to advance, so receives may time out while
545         * the connection is stopped.
546         * <P>
547         * If message listeners are running when <CODE>stop</CODE> is invoked, the
548         * <CODE>stop</CODE> call must wait until all of them have returned before
549         * it may return. While these message listeners are completing, they must
550         * have the full services of the connection available to them.
551         *
552         * @throws JMSException if the JMS provider fails to stop message delivery
553         *                 due to some internal error.
554         * @see javax.jms.Connection#start()
555         */
556        public void stop() throws JMSException {
557            checkClosedOrFailed();
558            if (started.compareAndSet(true, false)) {
559                synchronized(sessions) {
560                    for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
561                        ActiveMQSession s = i.next();
562                        s.stop();
563                    }
564                }
565            }
566        }
567    
568        /**
569         * Closes the connection.
570         * <P>
571         * Since a provider typically allocates significant resources outside the
572         * JVM on behalf of a connection, clients should close these resources when
573         * they are not needed. Relying on garbage collection to eventually reclaim
574         * these resources may not be timely enough.
575         * <P>
576         * There is no need to close the sessions, producers, and consumers of a
577         * closed connection.
578         * <P>
579         * Closing a connection causes all temporary destinations to be deleted.
580         * <P>
581         * When this method is invoked, it should not return until message
582         * processing has been shut down in an orderly fashion. This means that all
583         * message listeners that may have been running have returned, and that all
584         * pending receives have returned. A close terminates all pending message
585         * receives on the connection's sessions' consumers. The receives may return
586         * with a message or with null, depending on whether there was a message
587         * available at the time of the close. If one or more of the connection's
588         * sessions' message listeners is processing a message at the time when
589         * connection <CODE>close</CODE> is invoked, all the facilities of the
590         * connection and its sessions must remain available to those listeners
591         * until they return control to the JMS provider.
592         * <P>
593         * Closing a connection causes any of its sessions' transactions in progress
594         * to be rolled back. In the case where a session's work is coordinated by
595         * an external transaction manager, a session's <CODE>commit</CODE> and
596         * <CODE> rollback</CODE> methods are not used and the result of a closed
597         * session's work is determined later by the transaction manager. Closing a
598         * connection does NOT force an acknowledgment of client-acknowledged
599         * sessions.
600         * <P>
601         * Invoking the <CODE>acknowledge</CODE> method of a received message from
602         * a closed connection's session must throw an
603         * <CODE>IllegalStateException</CODE>. Closing a closed connection must
604         * NOT throw an exception.
605         *
606         * @throws JMSException if the JMS provider fails to close the connection
607         *                 due to some internal error. For example, a failure to
608         *                 release resources or to close a socket connection can
609         *                 cause this exception to be thrown.
610         */
611        public void close() throws JMSException {
612            // Store the interrupted state and clear so that cleanup happens without
613            // leaking connection resources.  Reset in finally to preserve state.
614            boolean interrupted = Thread.interrupted();
615    
616            try {
617    
618                // If we were running, lets stop first.
619                if (!closed.get() && !transportFailed.get()) {
620                    stop();
621                }
622    
623                synchronized (this) {
624                    if (!closed.get()) {
625                        closing.set(true);
626    
627                        if (destinationSource != null) {
628                            destinationSource.stop();
629                            destinationSource = null;
630                        }
631                        if (advisoryConsumer != null) {
632                            advisoryConsumer.dispose();
633                            advisoryConsumer = null;
634                        }
635    
636                        Scheduler scheduler = this.scheduler;
637                        if (scheduler != null) {
638                            try {
639                                scheduler.stop();
640                            } catch (Exception e) {
641                                JMSException ex =  JMSExceptionSupport.create(e);
642                                throw ex;
643                            }
644                        }
645    
646                        long lastDeliveredSequenceId = 0;
647                        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
648                            ActiveMQSession s = i.next();
649                            s.dispose();
650                            lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
651                        }
652                        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
653                            ActiveMQConnectionConsumer c = i.next();
654                            c.dispose();
655                        }
656                        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
657                            ActiveMQInputStream c = i.next();
658                            c.dispose();
659                        }
660                        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
661                            ActiveMQOutputStream c = i.next();
662                            c.dispose();
663                        }
664    
665                        // As TemporaryQueue and TemporaryTopic instances are bound
666                        // to a connection we should just delete them after the connection
667                        // is closed to free up memory
668                        cleanUpTempDestinations();
669    
670                        if (isConnectionInfoSentToBroker) {
671                            // If we announced ourselves to the broker.. Try to let the broker
672                            // know that the connection is being shutdown.
673                            RemoveInfo removeCommand = info.createRemoveCommand();
674                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
675                            doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
676                            doAsyncSendPacket(new ShutdownInfo());
677                        }
678    
679                        started.set(false);
680    
681                        // TODO if we move the TaskRunnerFactory to the connection
682                        // factory
683                        // then we may need to call
684                        // factory.onConnectionClose(this);
685                        if (sessionTaskRunner != null) {
686                            sessionTaskRunner.shutdown();
687                        }
688                        closed.set(true);
689                        closing.set(false);
690                    }
691                }
692            } finally {
693                try {
694                    if (executor != null) {
695                        ThreadPoolUtils.shutdown(executor);
696                    }
697                } catch (Throwable e) {
698                    LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
699                }
700    
701                ServiceSupport.dispose(this.transport);
702    
703                factoryStats.removeConnection(this);
704                if (interrupted) {
705                    Thread.currentThread().interrupt();
706                }
707            }
708        }
709    
710        /**
711         * Tells the broker to terminate its VM. This can be used to cleanly
712         * terminate a broker running in a standalone java process. Server must have
713         * property enable.vm.shutdown=true defined to allow this to work.
714         */
715        // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
716        // implemented.
717        /*
718         * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
719         * command = new BrokerAdminCommand();
720         * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
721         * asyncSendPacket(command); }
722         */
723    
724        /**
725         * Create a durable connection consumer for this connection (optional
726         * operation). This is an expert facility not used by regular JMS clients.
727         *
728         * @param topic topic to access
729         * @param subscriptionName durable subscription name
730         * @param messageSelector only messages with properties matching the message
731         *                selector expression are delivered. A value of null or an
732         *                empty string indicates that there is no message selector
733         *                for the message consumer.
734         * @param sessionPool the server session pool to associate with this durable
735         *                connection consumer
736         * @param maxMessages the maximum number of messages that can be assigned to
737         *                a server session at one time
738         * @return the durable connection consumer
739         * @throws JMSException if the <CODE>Connection</CODE> object fails to
740         *                 create a connection consumer due to some internal error
741         *                 or invalid arguments for <CODE>sessionPool</CODE> and
742         *                 <CODE>messageSelector</CODE>.
743         * @throws javax.jms.InvalidDestinationException if an invalid destination
744         *                 is specified.
745         * @throws javax.jms.InvalidSelectorException if the message selector is
746         *                 invalid.
747         * @see javax.jms.ConnectionConsumer
748         * @since 1.1
749         */
750        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
751            throws JMSException {
752            return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
753        }
754    
755        /**
756         * Create a durable connection consumer for this connection (optional
757         * operation). This is an expert facility not used by regular JMS clients.
758         *
759         * @param topic topic to access
760         * @param subscriptionName durable subscription name
761         * @param messageSelector only messages with properties matching the message
762         *                selector expression are delivered. A value of null or an
763         *                empty string indicates that there is no message selector
764         *                for the message consumer.
765         * @param sessionPool the server session pool to associate with this durable
766         *                connection consumer
767         * @param maxMessages the maximum number of messages that can be assigned to
768         *                a server session at one time
769         * @param noLocal set true if you want to filter out messages published
770         *                locally
771         * @return the durable connection consumer
772         * @throws JMSException if the <CODE>Connection</CODE> object fails to
773         *                 create a connection consumer due to some internal error
774         *                 or invalid arguments for <CODE>sessionPool</CODE> and
775         *                 <CODE>messageSelector</CODE>.
776         * @throws javax.jms.InvalidDestinationException if an invalid destination
777         *                 is specified.
778         * @throws javax.jms.InvalidSelectorException if the message selector is
779         *                 invalid.
780         * @see javax.jms.ConnectionConsumer
781         * @since 1.1
782         */
783        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
784                                                                  boolean noLocal) throws JMSException {
785            checkClosedOrFailed();
786    
787            if (queueOnlyConnection) {
788                throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
789            }
790    
791            ensureConnectionInfoSent();
792            SessionId sessionId = new SessionId(info.getConnectionId(), -1);
793            ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
794            info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
795            info.setSubscriptionName(subscriptionName);
796            info.setSelector(messageSelector);
797            info.setPrefetchSize(maxMessages);
798            info.setDispatchAsync(isDispatchAsync());
799    
800            // Allows the options on the destination to configure the consumerInfo
801            if (info.getDestination().getOptions() != null) {
802                Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
803                IntrospectionSupport.setProperties(this.info, options, "consumer.");
804            }
805    
806            return new ActiveMQConnectionConsumer(this, sessionPool, info);
807        }
808    
809        // Properties
810        // -------------------------------------------------------------------------
811    
812        /**
813         * Returns true if this connection has been started
814         *
815         * @return true if this Connection is started
816         */
817        public boolean isStarted() {
818            return started.get();
819        }
820    
821        /**
822         * Returns true if the connection is closed
823         */
824        public boolean isClosed() {
825            return closed.get();
826        }
827    
828        /**
829         * Returns true if the connection is in the process of being closed
830         */
831        public boolean isClosing() {
832            return closing.get();
833        }
834    
835        /**
836         * Returns true if the underlying transport has failed
837         */
838        public boolean isTransportFailed() {
839            return transportFailed.get();
840        }
841    
842        /**
843         * @return Returns the prefetchPolicy.
844         */
845        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
846            return prefetchPolicy;
847        }
848    
849        /**
850         * Sets the <a
851         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
852         * policy</a> for consumers created by this connection.
853         */
854        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
855            this.prefetchPolicy = prefetchPolicy;
856        }
857    
858        /**
859         */
860        public Transport getTransportChannel() {
861            return transport;
862        }
863    
864        /**
865         * @return Returns the clientID of the connection, forcing one to be
866         *         generated if one has not yet been configured.
867         */
868        public String getInitializedClientID() throws JMSException {
869            ensureConnectionInfoSent();
870            return info.getClientId();
871        }
872    
873        /**
874         * @return Returns the timeStampsDisableByDefault.
875         */
876        public boolean isDisableTimeStampsByDefault() {
877            return disableTimeStampsByDefault;
878        }
879    
880        /**
881         * Sets whether or not timestamps on messages should be disabled or not. If
882         * you disable them it adds a small performance boost.
883         */
884        public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
885            this.disableTimeStampsByDefault = timeStampsDisableByDefault;
886        }
887    
888        /**
889         * @return Returns the dispatchOptimizedMessage.
890         */
891        public boolean isOptimizedMessageDispatch() {
892            return optimizedMessageDispatch;
893        }
894    
895        /**
896         * If this flag is set then an larger prefetch limit is used - only
897         * applicable for durable topic subscribers.
898         */
899        public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
900            this.optimizedMessageDispatch = dispatchOptimizedMessage;
901        }
902    
903        /**
904         * @return Returns the closeTimeout.
905         */
906        public int getCloseTimeout() {
907            return closeTimeout;
908        }
909    
910        /**
911         * Sets the timeout before a close is considered complete. Normally a
912         * close() on a connection waits for confirmation from the broker; this
913         * allows that operation to timeout to save the client hanging if there is
914         * no broker
915         */
916        public void setCloseTimeout(int closeTimeout) {
917            this.closeTimeout = closeTimeout;
918        }
919    
920        /**
921         * @return ConnectionInfo
922         */
923        public ConnectionInfo getConnectionInfo() {
924            return this.info;
925        }
926    
927        public boolean isUseRetroactiveConsumer() {
928            return useRetroactiveConsumer;
929        }
930    
931        /**
932         * Sets whether or not retroactive consumers are enabled. Retroactive
933         * consumers allow non-durable topic subscribers to receive old messages
934         * that were published before the non-durable subscriber started.
935         */
936        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
937            this.useRetroactiveConsumer = useRetroactiveConsumer;
938        }
939    
940        public boolean isNestedMapAndListEnabled() {
941            return nestedMapAndListEnabled;
942        }
943    
944        /**
945         * Enables/disables whether or not Message properties and MapMessage entries
946         * support <a
947         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
948         * Structures</a> of Map and List objects
949         */
950        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
951            this.nestedMapAndListEnabled = structuredMapsEnabled;
952        }
953    
954        public boolean isExclusiveConsumer() {
955            return exclusiveConsumer;
956        }
957    
958        /**
959         * Enables or disables whether or not queue consumers should be exclusive or
960         * not for example to preserve ordering when not using <a
961         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
962         *
963         * @param exclusiveConsumer
964         */
965        public void setExclusiveConsumer(boolean exclusiveConsumer) {
966            this.exclusiveConsumer = exclusiveConsumer;
967        }
968    
969        /**
970         * Adds a transport listener so that a client can be notified of events in
971         * the underlying transport
972         */
973        public void addTransportListener(TransportListener transportListener) {
974            transportListeners.add(transportListener);
975        }
976    
977        public void removeTransportListener(TransportListener transportListener) {
978            transportListeners.remove(transportListener);
979        }
980    
981        public boolean isUseDedicatedTaskRunner() {
982            return useDedicatedTaskRunner;
983        }
984    
985        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
986            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
987        }
988    
989        public TaskRunnerFactory getSessionTaskRunner() {
990            synchronized (this) {
991                if (sessionTaskRunner == null) {
992                    sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
993                    sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
994                }
995            }
996            return sessionTaskRunner;
997        }
998    
999        public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1000            this.sessionTaskRunner = sessionTaskRunner;
1001        }
1002    
1003        public MessageTransformer getTransformer() {
1004            return transformer;
1005        }
1006    
1007        /**
1008         * Sets the transformer used to transform messages before they are sent on
1009         * to the JMS bus or when they are received from the bus but before they are
1010         * delivered to the JMS client
1011         */
1012        public void setTransformer(MessageTransformer transformer) {
1013            this.transformer = transformer;
1014        }
1015    
1016        /**
1017         * @return the statsEnabled
1018         */
1019        public boolean isStatsEnabled() {
1020            return this.stats.isEnabled();
1021        }
1022    
1023        /**
1024         * @param statsEnabled the statsEnabled to set
1025         */
1026        public void setStatsEnabled(boolean statsEnabled) {
1027            this.stats.setEnabled(statsEnabled);
1028        }
1029    
1030        /**
1031         * Returns the {@link DestinationSource} object which can be used to listen to destinations
1032         * being created or destroyed or to enquire about the current destinations available on the broker
1033         *
1034         * @return a lazily created destination source
1035         * @throws JMSException
1036         */
1037        public DestinationSource getDestinationSource() throws JMSException {
1038            if (destinationSource == null) {
1039                destinationSource = new DestinationSource(this);
1040                destinationSource.start();
1041            }
1042            return destinationSource;
1043        }
1044    
1045        // Implementation methods
1046        // -------------------------------------------------------------------------
1047    
1048        /**
1049         * Used internally for adding Sessions to the Connection
1050         *
1051         * @param session
1052         * @throws JMSException
1053         * @throws JMSException
1054         */
1055        protected void addSession(ActiveMQSession session) throws JMSException {
1056            this.sessions.add(session);
1057            if (sessions.size() > 1 || session.isTransacted()) {
1058                optimizedMessageDispatch = false;
1059            }
1060        }
1061    
1062        /**
1063         * Used interanlly for removing Sessions from a Connection
1064         *
1065         * @param session
1066         */
1067        protected void removeSession(ActiveMQSession session) {
1068            this.sessions.remove(session);
1069            this.removeDispatcher(session);
1070        }
1071    
1072        /**
1073         * Add a ConnectionConsumer
1074         *
1075         * @param connectionConsumer
1076         * @throws JMSException
1077         */
1078        protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1079            this.connectionConsumers.add(connectionConsumer);
1080        }
1081    
1082        /**
1083         * Remove a ConnectionConsumer
1084         *
1085         * @param connectionConsumer
1086         */
1087        protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1088            this.connectionConsumers.remove(connectionConsumer);
1089            this.removeDispatcher(connectionConsumer);
1090        }
1091    
1092        /**
1093         * Creates a <CODE>TopicSession</CODE> object.
1094         *
1095         * @param transacted indicates whether the session is transacted
1096         * @param acknowledgeMode indicates whether the consumer or the client will
1097         *                acknowledge any messages it receives; ignored if the
1098         *                session is transacted. Legal values are
1099         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1100         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1101         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1102         * @return a newly created topic session
1103         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1104         *                 to create a session due to some internal error or lack of
1105         *                 support for the specific transaction and acknowledgement
1106         *                 mode.
1107         * @see Session#AUTO_ACKNOWLEDGE
1108         * @see Session#CLIENT_ACKNOWLEDGE
1109         * @see Session#DUPS_OK_ACKNOWLEDGE
1110         */
1111        public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1112            return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1113        }
1114    
1115        /**
1116         * Creates a connection consumer for this connection (optional operation).
1117         * This is an expert facility not used by regular JMS clients.
1118         *
1119         * @param topic the topic to access
1120         * @param messageSelector only messages with properties matching the message
1121         *                selector expression are delivered. A value of null or an
1122         *                empty string indicates that there is no message selector
1123         *                for the message consumer.
1124         * @param sessionPool the server session pool to associate with this
1125         *                connection consumer
1126         * @param maxMessages the maximum number of messages that can be assigned to
1127         *                a server session at one time
1128         * @return the connection consumer
1129         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1130         *                 to create a connection consumer due to some internal
1131         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1132         *                 and <CODE>messageSelector</CODE>.
1133         * @throws javax.jms.InvalidDestinationException if an invalid topic is
1134         *                 specified.
1135         * @throws javax.jms.InvalidSelectorException if the message selector is
1136         *                 invalid.
1137         * @see javax.jms.ConnectionConsumer
1138         */
1139        public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1140            return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1141        }
1142    
1143        /**
1144         * Creates a connection consumer for this connection (optional operation).
1145         * This is an expert facility not used by regular JMS clients.
1146         *
1147         * @param queue the queue to access
1148         * @param messageSelector only messages with properties matching the message
1149         *                selector expression are delivered. A value of null or an
1150         *                empty string indicates that there is no message selector
1151         *                for the message consumer.
1152         * @param sessionPool the server session pool to associate with this
1153         *                connection consumer
1154         * @param maxMessages the maximum number of messages that can be assigned to
1155         *                a server session at one time
1156         * @return the connection consumer
1157         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1158         *                 to create a connection consumer due to some internal
1159         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1160         *                 and <CODE>messageSelector</CODE>.
1161         * @throws javax.jms.InvalidDestinationException if an invalid queue is
1162         *                 specified.
1163         * @throws javax.jms.InvalidSelectorException if the message selector is
1164         *                 invalid.
1165         * @see javax.jms.ConnectionConsumer
1166         */
1167        public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1168            return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1169        }
1170    
1171        /**
1172         * Creates a connection consumer for this connection (optional operation).
1173         * This is an expert facility not used by regular JMS clients.
1174         *
1175         * @param destination the destination to access
1176         * @param messageSelector only messages with properties matching the message
1177         *                selector expression are delivered. A value of null or an
1178         *                empty string indicates that there is no message selector
1179         *                for the message consumer.
1180         * @param sessionPool the server session pool to associate with this
1181         *                connection consumer
1182         * @param maxMessages the maximum number of messages that can be assigned to
1183         *                a server session at one time
1184         * @return the connection consumer
1185         * @throws JMSException if the <CODE>Connection</CODE> object fails to
1186         *                 create a connection consumer due to some internal error
1187         *                 or invalid arguments for <CODE>sessionPool</CODE> and
1188         *                 <CODE>messageSelector</CODE>.
1189         * @throws javax.jms.InvalidDestinationException if an invalid destination
1190         *                 is specified.
1191         * @throws javax.jms.InvalidSelectorException if the message selector is
1192         *                 invalid.
1193         * @see javax.jms.ConnectionConsumer
1194         * @since 1.1
1195         */
1196        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1197            return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1198        }
1199    
1200        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1201            throws JMSException {
1202    
1203            checkClosedOrFailed();
1204            ensureConnectionInfoSent();
1205    
1206            ConsumerId consumerId = createConsumerId();
1207            ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1208            consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1209            consumerInfo.setSelector(messageSelector);
1210            consumerInfo.setPrefetchSize(maxMessages);
1211            consumerInfo.setNoLocal(noLocal);
1212            consumerInfo.setDispatchAsync(isDispatchAsync());
1213    
1214            // Allows the options on the destination to configure the consumerInfo
1215            if (consumerInfo.getDestination().getOptions() != null) {
1216                Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1217                IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1218            }
1219    
1220            return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1221        }
1222    
1223        /**
1224         * @return
1225         */
1226        private ConsumerId createConsumerId() {
1227            return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1228        }
1229    
1230        /**
1231         * @return
1232         */
1233        private ProducerId createProducerId() {
1234            return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1235        }
1236    
1237        /**
1238         * Creates a <CODE>QueueSession</CODE> object.
1239         *
1240         * @param transacted indicates whether the session is transacted
1241         * @param acknowledgeMode indicates whether the consumer or the client will
1242         *                acknowledge any messages it receives; ignored if the
1243         *                session is transacted. Legal values are
1244         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1245         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1246         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1247         * @return a newly created queue session
1248         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1249         *                 to create a session due to some internal error or lack of
1250         *                 support for the specific transaction and acknowledgement
1251         *                 mode.
1252         * @see Session#AUTO_ACKNOWLEDGE
1253         * @see Session#CLIENT_ACKNOWLEDGE
1254         * @see Session#DUPS_OK_ACKNOWLEDGE
1255         */
1256        public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1257            return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1258        }
1259    
1260        /**
1261         * Ensures that the clientID was manually specified and not auto-generated.
1262         * If the clientID was not specified this method will throw an exception.
1263         * This method is used to ensure that the clientID + durableSubscriber name
1264         * are used correctly.
1265         *
1266         * @throws JMSException
1267         */
1268        public void checkClientIDWasManuallySpecified() throws JMSException {
1269            if (!userSpecifiedClientID) {
1270                throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1271            }
1272        }
1273    
1274        /**
1275         * send a Packet through the Connection - for internal use only
1276         *
1277         * @param command
1278         * @throws JMSException
1279         */
1280        public void asyncSendPacket(Command command) throws JMSException {
1281            if (isClosed()) {
1282                throw new ConnectionClosedException();
1283            } else {
1284                doAsyncSendPacket(command);
1285            }
1286        }
1287    
1288        private void doAsyncSendPacket(Command command) throws JMSException {
1289            try {
1290                this.transport.oneway(command);
1291            } catch (IOException e) {
1292                throw JMSExceptionSupport.create(e);
1293            }
1294        }
1295    
1296        /**
1297         * Send a packet through a Connection - for internal use only
1298         *
1299         * @param command
1300         * @return
1301         * @throws JMSException
1302         */
1303        public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1304            if(onComplete==null) {
1305                syncSendPacket(command);
1306            } else {
1307                if (isClosed()) {
1308                    throw new ConnectionClosedException();
1309                }
1310                try {
1311                    this.transport.asyncRequest(command, new ResponseCallback() {
1312                        @Override
1313                        public void onCompletion(FutureResponse resp) {
1314                            Response response;
1315                            Throwable exception = null;
1316                            try {
1317                                response = resp.getResult();
1318                                if (response.isException()) {
1319                                    ExceptionResponse er = (ExceptionResponse)response;
1320                                    exception = er.getException();
1321                                }
1322                            } catch (Exception e) {
1323                                exception = e;
1324                            }
1325                            if(exception!=null) {
1326                                if ( exception instanceof JMSException) {
1327                                    onComplete.onException((JMSException) exception);
1328                                } else {
1329                                    if (isClosed()||closing.get()) {
1330                                        LOG.debug("Received an exception but connection is closing");
1331                                    }
1332                                    JMSException jmsEx = null;
1333                                    try {
1334                                        jmsEx = JMSExceptionSupport.create(exception);
1335                                    } catch(Throwable e) {
1336                                        LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1337                                    }
1338                                    // dispose of transport for security exceptions on connection initiation
1339                                    if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1340                                        Transport t = transport;
1341                                        if (null != t){
1342                                            ServiceSupport.dispose(t);
1343                                        }
1344                                    }
1345                                    if (jmsEx !=null) {
1346                                        onComplete.onException(jmsEx);
1347                                    }
1348                                }
1349                            } else {
1350                                onComplete.onSuccess();
1351                            }
1352                        }
1353                    });
1354                } catch (IOException e) {
1355                    throw JMSExceptionSupport.create(e);
1356                }
1357            }
1358        }
1359    
1360        public Response syncSendPacket(Command command) throws JMSException {
1361            if (isClosed()) {
1362                throw new ConnectionClosedException();
1363            } else {
1364    
1365                try {
1366                    Response response = (Response)this.transport.request(command);
1367                    if (response.isException()) {
1368                        ExceptionResponse er = (ExceptionResponse)response;
1369                        if (er.getException() instanceof JMSException) {
1370                            throw (JMSException)er.getException();
1371                        } else {
1372                            if (isClosed()||closing.get()) {
1373                                LOG.debug("Received an exception but connection is closing");
1374                            }
1375                            JMSException jmsEx = null;
1376                            try {
1377                                jmsEx = JMSExceptionSupport.create(er.getException());
1378                            } catch(Throwable e) {
1379                                LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1380                            }
1381                            //dispose of transport for security exceptions
1382                            if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1383                                Transport t = this.transport;
1384                                if (null != t){
1385                                    ServiceSupport.dispose(t);
1386                                }
1387                            }
1388                            if (jmsEx !=null) {
1389                                throw jmsEx;
1390                            }
1391                        }
1392                    }
1393                    return response;
1394                } catch (IOException e) {
1395                    throw JMSExceptionSupport.create(e);
1396                }
1397            }
1398        }
1399    
1400        /**
1401         * Send a packet through a Connection - for internal use only
1402         *
1403         * @param command
1404         * @return
1405         * @throws JMSException
1406         */
1407        public Response syncSendPacket(Command command, int timeout) throws JMSException {
1408            if (isClosed() || closing.get()) {
1409                throw new ConnectionClosedException();
1410            } else {
1411                return doSyncSendPacket(command, timeout);
1412            }
1413        }
1414    
1415        private Response doSyncSendPacket(Command command, int timeout)
1416                throws JMSException {
1417            try {
1418                Response response = (Response) (timeout > 0
1419                        ? this.transport.request(command, timeout)
1420                        : this.transport.request(command));
1421                if (response != null && response.isException()) {
1422                    ExceptionResponse er = (ExceptionResponse)response;
1423                    if (er.getException() instanceof JMSException) {
1424                        throw (JMSException)er.getException();
1425                    } else {
1426                        throw JMSExceptionSupport.create(er.getException());
1427                    }
1428                }
1429                return response;
1430            } catch (IOException e) {
1431                throw JMSExceptionSupport.create(e);
1432            }
1433        }
1434    
1435        /**
1436         * @return statistics for this Connection
1437         */
1438        public StatsImpl getStats() {
1439            return stats;
1440        }
1441    
1442        /**
1443         * simply throws an exception if the Connection is already closed or the
1444         * Transport has failed
1445         *
1446         * @throws JMSException
1447         */
1448        protected synchronized void checkClosedOrFailed() throws JMSException {
1449            checkClosed();
1450            if (transportFailed.get()) {
1451                throw new ConnectionFailedException(firstFailureError);
1452            }
1453        }
1454    
1455        /**
1456         * simply throws an exception if the Connection is already closed
1457         *
1458         * @throws JMSException
1459         */
1460        protected synchronized void checkClosed() throws JMSException {
1461            if (closed.get()) {
1462                throw new ConnectionClosedException();
1463            }
1464        }
1465    
1466        /**
1467         * Send the ConnectionInfo to the Broker
1468         *
1469         * @throws JMSException
1470         */
1471        protected void ensureConnectionInfoSent() throws JMSException {
1472            synchronized(this.ensureConnectionInfoSentMutex) {
1473                // Can we skip sending the ConnectionInfo packet??
1474                if (isConnectionInfoSentToBroker || closed.get()) {
1475                    return;
1476                }
1477                //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1478                if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1479                    info.setClientId(clientIdGenerator.generateId());
1480                }
1481                syncSendPacket(info.copy());
1482    
1483                this.isConnectionInfoSentToBroker = true;
1484                // Add a temp destination advisory consumer so that
1485                // We know what the valid temporary destinations are on the
1486                // broker without having to do an RPC to the broker.
1487    
1488                ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1489                if (watchTopicAdvisories) {
1490                    advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1491                }
1492            }
1493        }
1494    
1495        public synchronized boolean isWatchTopicAdvisories() {
1496            return watchTopicAdvisories;
1497        }
1498    
1499        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1500            this.watchTopicAdvisories = watchTopicAdvisories;
1501        }
1502    
1503        /**
1504         * @return Returns the useAsyncSend.
1505         */
1506        public boolean isUseAsyncSend() {
1507            return useAsyncSend;
1508        }
1509    
1510        /**
1511         * Forces the use of <a
1512         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1513         * adds a massive performance boost; but means that the send() method will
1514         * return immediately whether the message has been sent or not which could
1515         * lead to message loss.
1516         */
1517        public void setUseAsyncSend(boolean useAsyncSend) {
1518            this.useAsyncSend = useAsyncSend;
1519        }
1520    
1521        /**
1522         * @return true if always sync send messages
1523         */
1524        public boolean isAlwaysSyncSend() {
1525            return this.alwaysSyncSend;
1526        }
1527    
1528        /**
1529         * Set true if always require messages to be sync sent
1530         *
1531         * @param alwaysSyncSend
1532         */
1533        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1534            this.alwaysSyncSend = alwaysSyncSend;
1535        }
1536    
1537        /**
1538         * @return the messagePrioritySupported
1539         */
1540        public boolean isMessagePrioritySupported() {
1541            return this.messagePrioritySupported;
1542        }
1543    
1544        /**
1545         * @param messagePrioritySupported the messagePrioritySupported to set
1546         */
1547        public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1548            this.messagePrioritySupported = messagePrioritySupported;
1549        }
1550    
1551        /**
1552         * Cleans up this connection so that it's state is as if the connection was
1553         * just created. This allows the Resource Adapter to clean up a connection
1554         * so that it can be reused without having to close and recreate the
1555         * connection.
1556         */
1557        public void cleanup() throws JMSException {
1558    
1559            if (advisoryConsumer != null && !isTransportFailed()) {
1560                advisoryConsumer.dispose();
1561                advisoryConsumer = null;
1562            }
1563    
1564            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1565                ActiveMQSession s = i.next();
1566                s.dispose();
1567            }
1568            for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1569                ActiveMQConnectionConsumer c = i.next();
1570                c.dispose();
1571            }
1572            for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1573                ActiveMQInputStream c = i.next();
1574                c.dispose();
1575            }
1576            for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1577                ActiveMQOutputStream c = i.next();
1578                c.dispose();
1579            }
1580    
1581            if (isConnectionInfoSentToBroker) {
1582                if (!transportFailed.get() && !closing.get()) {
1583                    syncSendPacket(info.createRemoveCommand());
1584                }
1585                isConnectionInfoSentToBroker = false;
1586            }
1587            if (userSpecifiedClientID) {
1588                info.setClientId(null);
1589                userSpecifiedClientID = false;
1590            }
1591            clientIDSet = false;
1592    
1593            started.set(false);
1594        }
1595    
1596        public void finalize() throws Throwable{
1597            Scheduler s = this.scheduler;
1598            if (s != null){
1599                s.stop();
1600            }
1601        }
1602    
1603        /**
1604         * Changes the associated username/password that is associated with this
1605         * connection. If the connection has been used, you must called cleanup()
1606         * before calling this method.
1607         *
1608         * @throws IllegalStateException if the connection is in used.
1609         */
1610        public void changeUserInfo(String userName, String password) throws JMSException {
1611            if (isConnectionInfoSentToBroker) {
1612                throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1613            }
1614            this.info.setUserName(userName);
1615            this.info.setPassword(password);
1616        }
1617    
1618        /**
1619         * @return Returns the resourceManagerId.
1620         * @throws JMSException
1621         */
1622        public String getResourceManagerId() throws JMSException {
1623            waitForBrokerInfo();
1624            if (brokerInfo == null) {
1625                throw new JMSException("Connection failed before Broker info was received.");
1626            }
1627            return brokerInfo.getBrokerId().getValue();
1628        }
1629    
1630        /**
1631         * Returns the broker name if one is available or null if one is not
1632         * available yet.
1633         */
1634        public String getBrokerName() {
1635            try {
1636                brokerInfoReceived.await(5, TimeUnit.SECONDS);
1637                if (brokerInfo == null) {
1638                    return null;
1639                }
1640                return brokerInfo.getBrokerName();
1641            } catch (InterruptedException e) {
1642                Thread.currentThread().interrupt();
1643                return null;
1644            }
1645        }
1646    
1647        /**
1648         * Returns the broker information if it is available or null if it is not
1649         * available yet.
1650         */
1651        public BrokerInfo getBrokerInfo() {
1652            return brokerInfo;
1653        }
1654    
1655        /**
1656         * @return Returns the RedeliveryPolicy.
1657         * @throws JMSException
1658         */
1659        public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1660            return redeliveryPolicyMap.getDefaultEntry();
1661        }
1662    
1663        /**
1664         * Sets the redelivery policy to be used when messages are rolled back
1665         */
1666        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1667            this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1668        }
1669    
1670        public BlobTransferPolicy getBlobTransferPolicy() {
1671            if (blobTransferPolicy == null) {
1672                blobTransferPolicy = createBlobTransferPolicy();
1673            }
1674            return blobTransferPolicy;
1675        }
1676    
1677        /**
1678         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1679         * OBjects) are transferred from producers to brokers to consumers
1680         */
1681        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1682            this.blobTransferPolicy = blobTransferPolicy;
1683        }
1684    
1685        /**
1686         * @return Returns the alwaysSessionAsync.
1687         */
1688        public boolean isAlwaysSessionAsync() {
1689            return alwaysSessionAsync;
1690        }
1691    
1692        /**
1693         * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1694         * the Connection. However, a separate thread is always used if there is more than one session, or the session
1695         * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
1696         * happens asynchronously.
1697         */
1698        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1699            this.alwaysSessionAsync = alwaysSessionAsync;
1700        }
1701    
1702        /**
1703         * @return Returns the optimizeAcknowledge.
1704         */
1705        public boolean isOptimizeAcknowledge() {
1706            return optimizeAcknowledge;
1707        }
1708    
1709        /**
1710         * Enables an optimised acknowledgement mode where messages are acknowledged
1711         * in batches rather than individually
1712         *
1713         * @param optimizeAcknowledge The optimizeAcknowledge to set.
1714         */
1715        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1716            this.optimizeAcknowledge = optimizeAcknowledge;
1717        }
1718    
1719        /**
1720         * The max time in milliseconds between optimized ack batches
1721         * @param optimizeAcknowledgeTimeOut
1722         */
1723        public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1724            this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1725        }
1726    
1727        public long getOptimizeAcknowledgeTimeOut() {
1728            return optimizeAcknowledgeTimeOut;
1729        }
1730    
1731        public long getWarnAboutUnstartedConnectionTimeout() {
1732            return warnAboutUnstartedConnectionTimeout;
1733        }
1734    
1735        /**
1736         * Enables the timeout from a connection creation to when a warning is
1737         * generated if the connection is not properly started via {@link #start()}
1738         * and a message is received by a consumer. It is a very common gotcha to
1739         * forget to <a
1740         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1741         * the connection</a> so this option makes the default case to create a
1742         * warning if the user forgets. To disable the warning just set the value to <
1743         * 0 (say -1).
1744         */
1745        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1746            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1747        }
1748    
1749        /**
1750         * @return the sendTimeout
1751         */
1752        public int getSendTimeout() {
1753            return sendTimeout;
1754        }
1755    
1756        /**
1757         * @param sendTimeout the sendTimeout to set
1758         */
1759        public void setSendTimeout(int sendTimeout) {
1760            this.sendTimeout = sendTimeout;
1761        }
1762    
1763        /**
1764         * @return the sendAcksAsync
1765         */
1766        public boolean isSendAcksAsync() {
1767            return sendAcksAsync;
1768        }
1769    
1770        /**
1771         * @param sendAcksAsync the sendAcksAsync to set
1772         */
1773        public void setSendAcksAsync(boolean sendAcksAsync) {
1774            this.sendAcksAsync = sendAcksAsync;
1775        }
1776    
1777        /**
1778         * Returns the time this connection was created
1779         */
1780        public long getTimeCreated() {
1781            return timeCreated;
1782        }
1783    
1784        private void waitForBrokerInfo() throws JMSException {
1785            try {
1786                brokerInfoReceived.await();
1787            } catch (InterruptedException e) {
1788                Thread.currentThread().interrupt();
1789                throw JMSExceptionSupport.create(e);
1790            }
1791        }
1792    
1793        // Package protected so that it can be used in unit tests
1794        public Transport getTransport() {
1795            return transport;
1796        }
1797    
1798        public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1799            producers.put(producerId, producer);
1800        }
1801    
1802        public void removeProducer(ProducerId producerId) {
1803            producers.remove(producerId);
1804        }
1805    
1806        public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1807            dispatchers.put(consumerId, dispatcher);
1808        }
1809    
1810        public void removeDispatcher(ConsumerId consumerId) {
1811            dispatchers.remove(consumerId);
1812        }
1813    
1814        /**
1815         * @param o - the command to consume
1816         */
1817        public void onCommand(final Object o) {
1818            final Command command = (Command)o;
1819            if (!closed.get() && command != null) {
1820                try {
1821                    command.visit(new CommandVisitorAdapter() {
1822                        @Override
1823                        public Response processMessageDispatch(MessageDispatch md) throws Exception {
1824                            waitForTransportInterruptionProcessingToComplete();
1825                            ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1826                            if (dispatcher != null) {
1827                                // Copy in case a embedded broker is dispatching via
1828                                // vm://
1829                                // md.getMessage() == null to signal end of queue
1830                                // browse.
1831                                Message msg = md.getMessage();
1832                                if (msg != null) {
1833                                    msg = msg.copy();
1834                                    msg.setReadOnlyBody(true);
1835                                    msg.setReadOnlyProperties(true);
1836                                    msg.setRedeliveryCounter(md.getRedeliveryCounter());
1837                                    msg.setConnection(ActiveMQConnection.this);
1838                                    md.setMessage(msg);
1839                                }
1840                                dispatcher.dispatch(md);
1841                            }
1842                            return null;
1843                        }
1844    
1845                        @Override
1846                        public Response processProducerAck(ProducerAck pa) throws Exception {
1847                            if (pa != null && pa.getProducerId() != null) {
1848                                ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1849                                if (producer != null) {
1850                                    producer.onProducerAck(pa);
1851                                }
1852                            }
1853                            return null;
1854                        }
1855    
1856                        @Override
1857                        public Response processBrokerInfo(BrokerInfo info) throws Exception {
1858                            brokerInfo = info;
1859                            brokerInfoReceived.countDown();
1860                            optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1861                            getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1862                            return null;
1863                        }
1864    
1865                        @Override
1866                        public Response processConnectionError(final ConnectionError error) throws Exception {
1867                            executor.execute(new Runnable() {
1868                                public void run() {
1869                                    onAsyncException(error.getException());
1870                                }
1871                            });
1872                            return null;
1873                        }
1874    
1875                        @Override
1876                        public Response processControlCommand(ControlCommand command) throws Exception {
1877                            onControlCommand(command);
1878                            return null;
1879                        }
1880    
1881                        @Override
1882                        public Response processConnectionControl(ConnectionControl control) throws Exception {
1883                            onConnectionControl((ConnectionControl)command);
1884                            return null;
1885                        }
1886    
1887                        @Override
1888                        public Response processConsumerControl(ConsumerControl control) throws Exception {
1889                            onConsumerControl((ConsumerControl)command);
1890                            return null;
1891                        }
1892    
1893                        @Override
1894                        public Response processWireFormat(WireFormatInfo info) throws Exception {
1895                            onWireFormatInfo((WireFormatInfo)command);
1896                            return null;
1897                        }
1898                    });
1899                } catch (Exception e) {
1900                    onClientInternalException(e);
1901                }
1902            }
1903    
1904            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1905                TransportListener listener = iter.next();
1906                listener.onCommand(command);
1907            }
1908        }
1909    
1910        protected void onWireFormatInfo(WireFormatInfo info) {
1911            protocolVersion.set(info.getVersion());
1912        }
1913    
1914        /**
1915         * Handles async client internal exceptions.
1916         * A client internal exception is usually one that has been thrown
1917         * by a container runtime component during asynchronous processing of a
1918         * message that does not affect the connection itself.
1919         * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1920         * its <code>onException</code> method, if one has been registered with this connection.
1921         *
1922         * @param error the exception that the problem
1923         */
1924        public void onClientInternalException(final Throwable error) {
1925            if ( !closed.get() && !closing.get() ) {
1926                if ( this.clientInternalExceptionListener != null ) {
1927                    executor.execute(new Runnable() {
1928                        public void run() {
1929                            ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1930                        }
1931                    });
1932                } else {
1933                    LOG.debug("Async client internal exception occurred with no exception listener registered: "
1934                            + error, error);
1935                }
1936            }
1937        }
1938    
1939        /**
1940         * Used for handling async exceptions
1941         *
1942         * @param error
1943         */
1944        public void onAsyncException(Throwable error) {
1945            if (!closed.get() && !closing.get()) {
1946                if (this.exceptionListener != null) {
1947    
1948                    if (!(error instanceof JMSException)) {
1949                        error = JMSExceptionSupport.create(error);
1950                    }
1951                    final JMSException e = (JMSException)error;
1952    
1953                    executor.execute(new Runnable() {
1954                        public void run() {
1955                            ActiveMQConnection.this.exceptionListener.onException(e);
1956                        }
1957                    });
1958    
1959                } else {
1960                    LOG.debug("Async exception with no exception listener: " + error, error);
1961                }
1962            }
1963        }
1964    
1965        public void onException(final IOException error) {
1966            onAsyncException(error);
1967            if (!closing.get() && !closed.get()) {
1968                executor.execute(new Runnable() {
1969                    public void run() {
1970                        transportFailed(error);
1971                        ServiceSupport.dispose(ActiveMQConnection.this.transport);
1972                        brokerInfoReceived.countDown();
1973                        try {
1974                            cleanup();
1975                        } catch (JMSException e) {
1976                            LOG.warn("Exception during connection cleanup, " + e, e);
1977                        }
1978                        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1979                            TransportListener listener = iter.next();
1980                            listener.onException(error);
1981                        }
1982                    }
1983                });
1984            }
1985        }
1986    
1987        public void transportInterupted() {
1988            this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1989            if (LOG.isDebugEnabled()) {
1990                LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1991            }
1992            signalInterruptionProcessingNeeded();
1993    
1994            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1995                ActiveMQSession s = i.next();
1996                s.clearMessagesInProgress();
1997            }
1998    
1999            for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
2000                connectionConsumer.clearMessagesInProgress();
2001            }
2002    
2003            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2004                TransportListener listener = iter.next();
2005                listener.transportInterupted();
2006            }
2007        }
2008    
2009        public void transportResumed() {
2010            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2011                TransportListener listener = iter.next();
2012                listener.transportResumed();
2013            }
2014        }
2015    
2016        /**
2017         * Create the DestinationInfo object for the temporary destination.
2018         *
2019         * @param topic - if its true topic, else queue.
2020         * @return DestinationInfo
2021         * @throws JMSException
2022         */
2023        protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2024    
2025            // Check if Destination info is of temporary type.
2026            ActiveMQTempDestination dest;
2027            if (topic) {
2028                dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2029            } else {
2030                dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2031            }
2032    
2033            DestinationInfo info = new DestinationInfo();
2034            info.setConnectionId(this.info.getConnectionId());
2035            info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2036            info.setDestination(dest);
2037            syncSendPacket(info);
2038    
2039            dest.setConnection(this);
2040            activeTempDestinations.put(dest, dest);
2041            return dest;
2042        }
2043    
2044        /**
2045         * @param destination
2046         * @throws JMSException
2047         */
2048        public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2049    
2050            checkClosedOrFailed();
2051    
2052            for (ActiveMQSession session : this.sessions) {
2053                if (session.isInUse(destination)) {
2054                    throw new JMSException("A consumer is consuming from the temporary destination");
2055                }
2056            }
2057    
2058            activeTempDestinations.remove(destination);
2059    
2060            DestinationInfo destInfo = new DestinationInfo();
2061            destInfo.setConnectionId(this.info.getConnectionId());
2062            destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2063            destInfo.setDestination(destination);
2064            destInfo.setTimeout(0);
2065            syncSendPacket(destInfo);
2066        }
2067    
2068        public boolean isDeleted(ActiveMQDestination dest) {
2069    
2070            // If we are not watching the advisories.. then
2071            // we will assume that the temp destination does exist.
2072            if (advisoryConsumer == null) {
2073                return false;
2074            }
2075    
2076            return !activeTempDestinations.contains(dest);
2077        }
2078    
2079        public boolean isCopyMessageOnSend() {
2080            return copyMessageOnSend;
2081        }
2082    
2083        public LongSequenceGenerator getLocalTransactionIdGenerator() {
2084            return localTransactionIdGenerator;
2085        }
2086    
2087        public boolean isUseCompression() {
2088            return useCompression;
2089        }
2090    
2091        /**
2092         * Enables the use of compression of the message bodies
2093         */
2094        public void setUseCompression(boolean useCompression) {
2095            this.useCompression = useCompression;
2096        }
2097    
2098        public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2099    
2100            checkClosedOrFailed();
2101            ensureConnectionInfoSent();
2102    
2103            DestinationInfo info = new DestinationInfo();
2104            info.setConnectionId(this.info.getConnectionId());
2105            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2106            info.setDestination(destination);
2107            info.setTimeout(0);
2108            syncSendPacket(info);
2109        }
2110    
2111        public boolean isDispatchAsync() {
2112            return dispatchAsync;
2113        }
2114    
2115        /**
2116         * Enables or disables the default setting of whether or not consumers have
2117         * their messages <a
2118         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2119         * synchronously or asynchronously by the broker</a>. For non-durable
2120         * topics for example we typically dispatch synchronously by default to
2121         * minimize context switches which boost performance. However sometimes its
2122         * better to go slower to ensure that a single blocked consumer socket does
2123         * not block delivery to other consumers.
2124         *
2125         * @param asyncDispatch If true then consumers created on this connection
2126         *                will default to having their messages dispatched
2127         *                asynchronously. The default value is true.
2128         */
2129        public void setDispatchAsync(boolean asyncDispatch) {
2130            this.dispatchAsync = asyncDispatch;
2131        }
2132    
2133        public boolean isObjectMessageSerializationDefered() {
2134            return objectMessageSerializationDefered;
2135        }
2136    
2137        /**
2138         * When an object is set on an ObjectMessage, the JMS spec requires the
2139         * object to be serialized by that set method. Enabling this flag causes the
2140         * object to not get serialized. The object may subsequently get serialized
2141         * if the message needs to be sent over a socket or stored to disk.
2142         */
2143        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2144            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2145        }
2146    
2147        public InputStream createInputStream(Destination dest) throws JMSException {
2148            return createInputStream(dest, null);
2149        }
2150    
2151        public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2152            return createInputStream(dest, messageSelector, false);
2153        }
2154    
2155        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2156            return createInputStream(dest, messageSelector, noLocal,  -1);
2157        }
2158    
2159        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2160            return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2161        }
2162    
2163        public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2164            return createInputStream(dest, null, false);
2165        }
2166    
2167        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2168            return createDurableInputStream(dest, name, messageSelector, false);
2169        }
2170    
2171        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2172            return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2173        }
2174    
2175        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2176            return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2177        }
2178    
2179        private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2180            checkClosedOrFailed();
2181            ensureConnectionInfoSent();
2182            return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2183        }
2184    
2185        /**
2186         * Creates a persistent output stream; individual messages will be written
2187         * to disk/database by the broker
2188         */
2189        public OutputStream createOutputStream(Destination dest) throws JMSException {
2190            return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2191        }
2192    
2193        /**
2194         * Creates a non persistent output stream; messages will not be written to
2195         * disk
2196         */
2197        public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2198            return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2199        }
2200    
2201        /**
2202         * Creates an output stream allowing full control over the delivery mode,
2203         * the priority and time to live of the messages and the properties added to
2204         * messages on the stream.
2205         *
2206         * @param streamProperties defines a map of key-value pairs where the keys
2207         *                are strings and the values are primitive values (numbers
2208         *                and strings) which are appended to the messages similarly
2209         *                to using the
2210         *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2211         *                method
2212         */
2213        public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2214            checkClosedOrFailed();
2215            ensureConnectionInfoSent();
2216            return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2217        }
2218    
2219        /**
2220         * Unsubscribes a durable subscription that has been created by a client.
2221         * <P>
2222         * This method deletes the state being maintained on behalf of the
2223         * subscriber by its provider.
2224         * <P>
2225         * It is erroneous for a client to delete a durable subscription while there
2226         * is an active <CODE>MessageConsumer </CODE> or
2227         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2228         * message is part of a pending transaction or has not been acknowledged in
2229         * the session.
2230         *
2231         * @param name the name used to identify this subscription
2232         * @throws JMSException if the session fails to unsubscribe to the durable
2233         *                 subscription due to some internal error.
2234         * @throws InvalidDestinationException if an invalid subscription name is
2235         *                 specified.
2236         * @since 1.1
2237         */
2238        public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2239            checkClosedOrFailed();
2240            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2241            rsi.setConnectionId(getConnectionInfo().getConnectionId());
2242            rsi.setSubscriptionName(name);
2243            rsi.setClientId(getConnectionInfo().getClientId());
2244            syncSendPacket(rsi);
2245        }
2246    
2247        /**
2248         * Internal send method optimized: - It does not copy the message - It can
2249         * only handle ActiveMQ messages. - You can specify if the send is async or
2250         * sync - Does not allow you to send /w a transaction.
2251         */
2252        void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2253            checkClosedOrFailed();
2254    
2255            if (destination.isTemporary() && isDeleted(destination)) {
2256                throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2257            }
2258    
2259            msg.setJMSDestination(destination);
2260            msg.setJMSDeliveryMode(deliveryMode);
2261            long expiration = 0L;
2262    
2263            if (!isDisableTimeStampsByDefault()) {
2264                long timeStamp = System.currentTimeMillis();
2265                msg.setJMSTimestamp(timeStamp);
2266                if (timeToLive > 0) {
2267                    expiration = timeToLive + timeStamp;
2268                }
2269            }
2270    
2271            msg.setJMSExpiration(expiration);
2272            msg.setJMSPriority(priority);
2273            msg.setJMSRedelivered(false);
2274            msg.setMessageId(messageId);
2275            msg.onSend();
2276            msg.setProducerId(msg.getMessageId().getProducerId());
2277    
2278            if (LOG.isDebugEnabled()) {
2279                LOG.debug("Sending message: " + msg);
2280            }
2281    
2282            if (async) {
2283                asyncSendPacket(msg);
2284            } else {
2285                syncSendPacket(msg);
2286            }
2287        }
2288    
2289        public void addOutputStream(ActiveMQOutputStream stream) {
2290            outputStreams.add(stream);
2291        }
2292    
2293        public void removeOutputStream(ActiveMQOutputStream stream) {
2294            outputStreams.remove(stream);
2295        }
2296    
2297        public void addInputStream(ActiveMQInputStream stream) {
2298            inputStreams.add(stream);
2299        }
2300    
2301        public void removeInputStream(ActiveMQInputStream stream) {
2302            inputStreams.remove(stream);
2303        }
2304    
2305        protected void onControlCommand(ControlCommand command) {
2306            String text = command.getCommand();
2307            if (text != null) {
2308                if ("shutdown".equals(text)) {
2309                    LOG.info("JVM told to shutdown");
2310                    System.exit(0);
2311                }
2312    
2313                // TODO Should we handle the "close" case?
2314                // if (false && "close".equals(text)){
2315                //     LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2316                //     try {
2317                //         close();
2318                //     } catch (JMSException e) {
2319                //     }
2320                // }
2321            }
2322        }
2323    
2324        protected void onConnectionControl(ConnectionControl command) {
2325            if (command.isFaultTolerant()) {
2326                this.optimizeAcknowledge = false;
2327                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2328                    ActiveMQSession s = i.next();
2329                    s.setOptimizeAcknowledge(false);
2330                }
2331            }
2332        }
2333    
2334        protected void onConsumerControl(ConsumerControl command) {
2335            if (command.isClose()) {
2336                for (ActiveMQSession session : this.sessions) {
2337                    session.close(command.getConsumerId());
2338                }
2339            } else {
2340                for (ActiveMQSession session : this.sessions) {
2341                    session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2342                }
2343            }
2344        }
2345    
2346        protected void transportFailed(IOException error) {
2347            transportFailed.set(true);
2348            if (firstFailureError == null) {
2349                firstFailureError = error;
2350            }
2351        }
2352    
2353        /**
2354         * Should a JMS message be copied to a new JMS Message object as part of the
2355         * send() method in JMS. This is enabled by default to be compliant with the
2356         * JMS specification. You can disable it if you do not mutate JMS messages
2357         * after they are sent for a performance boost
2358         */
2359        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2360            this.copyMessageOnSend = copyMessageOnSend;
2361        }
2362    
2363        @Override
2364        public String toString() {
2365            return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2366        }
2367    
2368        protected BlobTransferPolicy createBlobTransferPolicy() {
2369            return new BlobTransferPolicy();
2370        }
2371    
2372        public int getProtocolVersion() {
2373            return protocolVersion.get();
2374        }
2375    
2376        public int getProducerWindowSize() {
2377            return producerWindowSize;
2378        }
2379    
2380        public void setProducerWindowSize(int producerWindowSize) {
2381            this.producerWindowSize = producerWindowSize;
2382        }
2383    
2384        public void setAuditDepth(int auditDepth) {
2385            connectionAudit.setAuditDepth(auditDepth);
2386        }
2387    
2388        public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2389            connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2390        }
2391    
2392        protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2393            connectionAudit.removeDispatcher(dispatcher);
2394        }
2395    
2396        protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2397            return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2398        }
2399    
2400        protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2401            connectionAudit.rollbackDuplicate(dispatcher, message);
2402        }
2403    
2404        public IOException getFirstFailureError() {
2405            return firstFailureError;
2406        }
2407    
2408        protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2409            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2410            if (cdl != null) {
2411                if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2412                    LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2413                    cdl.await(10, TimeUnit.SECONDS);
2414                }
2415                signalInterruptionProcessingComplete();
2416            }
2417        }
2418    
2419        protected void transportInterruptionProcessingComplete() {
2420            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2421            if (cdl != null) {
2422                cdl.countDown();
2423                try {
2424                    signalInterruptionProcessingComplete();
2425                } catch (InterruptedException ignored) {}
2426            }
2427        }
2428    
2429        private void signalInterruptionProcessingComplete() throws InterruptedException {
2430            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2431            if (cdl.getCount()==0) {
2432                if (LOG.isDebugEnabled()) {
2433                    LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2434                }
2435                this.transportInterruptionProcessingComplete = null;
2436    
2437                FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2438                if (failoverTransport != null) {
2439                    failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2440                    if (LOG.isDebugEnabled()) {
2441                        LOG.debug("notified failover transport (" + failoverTransport
2442                                + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2443                    }
2444                }
2445    
2446            }
2447        }
2448    
2449        private void signalInterruptionProcessingNeeded() {
2450            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2451            if (failoverTransport != null) {
2452                failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2453                if (LOG.isDebugEnabled()) {
2454                    LOG.debug("notified failover transport (" + failoverTransport
2455                            + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2456                }
2457            }
2458        }
2459    
2460        /*
2461         * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2462         * will wait to receive re dispatched messages.
2463         * default value is 0 so there is no wait by default.
2464         */
2465        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2466            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2467        }
2468    
2469        public long getConsumerFailoverRedeliveryWaitPeriod() {
2470            return consumerFailoverRedeliveryWaitPeriod;
2471        }
2472    
2473        protected Scheduler getScheduler() throws JMSException {
2474            Scheduler result = scheduler;
2475            if (result == null) {
2476                synchronized (this) {
2477                    result = scheduler;
2478                    if (result == null) {
2479                        checkClosed();
2480                        try {
2481                            result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2482                            scheduler.start();
2483                        } catch(Exception e) {
2484                            throw JMSExceptionSupport.create(e);
2485                        }
2486                    }
2487                }
2488            }
2489            return result;
2490        }
2491    
2492        protected ThreadPoolExecutor getExecutor() {
2493            return this.executor;
2494        }
2495    
2496        /**
2497         * @return the checkForDuplicates
2498         */
2499        public boolean isCheckForDuplicates() {
2500            return this.checkForDuplicates;
2501        }
2502    
2503        /**
2504         * @param checkForDuplicates the checkForDuplicates to set
2505         */
2506        public void setCheckForDuplicates(boolean checkForDuplicates) {
2507            this.checkForDuplicates = checkForDuplicates;
2508        }
2509    
2510        public boolean isTransactedIndividualAck() {
2511            return transactedIndividualAck;
2512        }
2513    
2514        public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2515            this.transactedIndividualAck = transactedIndividualAck;
2516        }
2517    
2518        public boolean isNonBlockingRedelivery() {
2519            return nonBlockingRedelivery;
2520        }
2521    
2522        public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2523            this.nonBlockingRedelivery = nonBlockingRedelivery;
2524        }
2525    
2526        /**
2527         * Removes any TempDestinations that this connection has cached, ignoring
2528         * any exceptions generated because the destination is in use as they should
2529         * not be removed.
2530         */
2531        public void cleanUpTempDestinations() {
2532    
2533            if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2534                return;
2535            }
2536    
2537            Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2538                = this.activeTempDestinations.entrySet().iterator();
2539            while(entries.hasNext()) {
2540                ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2541                try {
2542                    // Only delete this temp destination if it was created from this connection. The connection used
2543                    // for the advisory consumer may also have a reference to this temp destination.
2544                    ActiveMQTempDestination dest = entry.getValue();
2545                    String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2546                    if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2547                        this.deleteTempDestination(entry.getValue());
2548                    }
2549                } catch (Exception ex) {
2550                    // the temp dest is in use so it can not be deleted.
2551                    // it is ok to leave it to connection tear down phase
2552                }
2553            }
2554        }
2555    
2556        /**
2557         * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2558         * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2559         */
2560        public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2561            this.redeliveryPolicyMap = redeliveryPolicyMap;
2562        }
2563    
2564        /**
2565         * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2566         * Consumers when dealing with transaction messages that have been rolled back.
2567         *
2568         * @return the redeliveryPolicyMap
2569         */
2570        public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2571            return redeliveryPolicyMap;
2572        }
2573    
2574        public int getMaxThreadPoolSize() {
2575            return maxThreadPoolSize;
2576        }
2577    
2578        public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2579            this.maxThreadPoolSize = maxThreadPoolSize;
2580        }
2581    
2582        /**
2583         * Enable enforcement of QueueConnection semantics.
2584         *
2585         * @return this object, useful for chaining
2586         */
2587        ActiveMQConnection enforceQueueOnlyConnection() {
2588            this.queueOnlyConnection = true;
2589            return this;
2590        }
2591    
2592        public RejectedExecutionHandler getRejectedTaskHandler() {
2593            return rejectedTaskHandler;
2594        }
2595    
2596        public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2597            this.rejectedTaskHandler = rejectedTaskHandler;
2598        }
2599    
2600        /**
2601         * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2602         * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
2603         * will not do any background Message acknowledgment.
2604         *
2605         * @return the scheduledOptimizedAckInterval
2606         */
2607        public long getOptimizedAckScheduledAckInterval() {
2608            return optimizedAckScheduledAckInterval;
2609        }
2610    
2611        /**
2612         * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2613         * have been configured with optimizeAcknowledge enabled.
2614         *
2615         * @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set
2616         */
2617        public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2618            this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2619        }
2620    }