001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.LinkedBlockingQueue;
031    import java.util.concurrent.RejectedExecutionHandler;
032    import java.util.concurrent.ThreadFactory;
033    import java.util.concurrent.ThreadPoolExecutor;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.atomic.AtomicBoolean;
036    import java.util.concurrent.atomic.AtomicInteger;
037    
038    import javax.jms.Connection;
039    import javax.jms.ConnectionConsumer;
040    import javax.jms.ConnectionMetaData;
041    import javax.jms.DeliveryMode;
042    import javax.jms.Destination;
043    import javax.jms.ExceptionListener;
044    import javax.jms.IllegalStateException;
045    import javax.jms.InvalidDestinationException;
046    import javax.jms.JMSException;
047    import javax.jms.Queue;
048    import javax.jms.QueueConnection;
049    import javax.jms.QueueSession;
050    import javax.jms.ServerSessionPool;
051    import javax.jms.Session;
052    import javax.jms.Topic;
053    import javax.jms.TopicConnection;
054    import javax.jms.TopicSession;
055    import javax.jms.XAConnection;
056    
057    import org.apache.activemq.advisory.DestinationSource;
058    import org.apache.activemq.blob.BlobTransferPolicy;
059    import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
060    import org.apache.activemq.command.ActiveMQDestination;
061    import org.apache.activemq.command.ActiveMQMessage;
062    import org.apache.activemq.command.ActiveMQTempDestination;
063    import org.apache.activemq.command.ActiveMQTempQueue;
064    import org.apache.activemq.command.ActiveMQTempTopic;
065    import org.apache.activemq.command.BrokerInfo;
066    import org.apache.activemq.command.Command;
067    import org.apache.activemq.command.CommandTypes;
068    import org.apache.activemq.command.ConnectionControl;
069    import org.apache.activemq.command.ConnectionError;
070    import org.apache.activemq.command.ConnectionId;
071    import org.apache.activemq.command.ConnectionInfo;
072    import org.apache.activemq.command.ConsumerControl;
073    import org.apache.activemq.command.ConsumerId;
074    import org.apache.activemq.command.ConsumerInfo;
075    import org.apache.activemq.command.ControlCommand;
076    import org.apache.activemq.command.DestinationInfo;
077    import org.apache.activemq.command.ExceptionResponse;
078    import org.apache.activemq.command.Message;
079    import org.apache.activemq.command.MessageDispatch;
080    import org.apache.activemq.command.MessageId;
081    import org.apache.activemq.command.ProducerAck;
082    import org.apache.activemq.command.ProducerId;
083    import org.apache.activemq.command.RemoveInfo;
084    import org.apache.activemq.command.RemoveSubscriptionInfo;
085    import org.apache.activemq.command.Response;
086    import org.apache.activemq.command.SessionId;
087    import org.apache.activemq.command.ShutdownInfo;
088    import org.apache.activemq.command.WireFormatInfo;
089    import org.apache.activemq.management.JMSConnectionStatsImpl;
090    import org.apache.activemq.management.JMSStatsImpl;
091    import org.apache.activemq.management.StatsCapable;
092    import org.apache.activemq.management.StatsImpl;
093    import org.apache.activemq.state.CommandVisitorAdapter;
094    import org.apache.activemq.thread.Scheduler;
095    import org.apache.activemq.thread.TaskRunnerFactory;
096    import org.apache.activemq.transport.FutureResponse;
097    import org.apache.activemq.transport.ResponseCallback;
098    import org.apache.activemq.transport.Transport;
099    import org.apache.activemq.transport.TransportListener;
100    import org.apache.activemq.transport.failover.FailoverTransport;
101    import org.apache.activemq.util.IdGenerator;
102    import org.apache.activemq.util.IntrospectionSupport;
103    import org.apache.activemq.util.JMSExceptionSupport;
104    import org.apache.activemq.util.LongSequenceGenerator;
105    import org.apache.activemq.util.ServiceSupport;
106    import org.apache.activemq.util.ThreadPoolUtils;
107    import org.slf4j.Logger;
108    import org.slf4j.LoggerFactory;
109    
110    public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
111    
112        public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
113        public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
114        public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
115        public static int DEFAULT_THREAD_POOL_SIZE = 1000;
116    
117        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
118    
119        public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
120    
121        protected boolean dispatchAsync=true;
122        protected boolean alwaysSessionAsync = true;
123    
124        private TaskRunnerFactory sessionTaskRunner;
125        private final ThreadPoolExecutor executor;
126    
127        // Connection state variables
128        private final ConnectionInfo info;
129        private ExceptionListener exceptionListener;
130        private ClientInternalExceptionListener clientInternalExceptionListener;
131        private boolean clientIDSet;
132        private boolean isConnectionInfoSentToBroker;
133        private boolean userSpecifiedClientID;
134    
135        // Configuration options variables
136        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
137        private BlobTransferPolicy blobTransferPolicy;
138        private RedeliveryPolicyMap redeliveryPolicyMap;
139        private MessageTransformer transformer;
140    
141        private boolean disableTimeStampsByDefault;
142        private boolean optimizedMessageDispatch = true;
143        private boolean copyMessageOnSend = true;
144        private boolean useCompression;
145        private boolean objectMessageSerializationDefered;
146        private boolean useAsyncSend;
147        private boolean optimizeAcknowledge;
148        private long optimizeAcknowledgeTimeOut = 0;
149        private long optimizedAckScheduledAckInterval = 0;
150        private boolean nestedMapAndListEnabled = true;
151        private boolean useRetroactiveConsumer;
152        private boolean exclusiveConsumer;
153        private boolean alwaysSyncSend;
154        private int closeTimeout = 15000;
155        private boolean watchTopicAdvisories = true;
156        private long warnAboutUnstartedConnectionTimeout = 500L;
157        private int sendTimeout =0;
158        private boolean sendAcksAsync=true;
159        private boolean checkForDuplicates = true;
160        private boolean queueOnlyConnection = false;
161    
162        private final Transport transport;
163        private final IdGenerator clientIdGenerator;
164        private final JMSStatsImpl factoryStats;
165        private final JMSConnectionStatsImpl stats;
166    
167        private final AtomicBoolean started = new AtomicBoolean(false);
168        private final AtomicBoolean closing = new AtomicBoolean(false);
169        private final AtomicBoolean closed = new AtomicBoolean(false);
170        private final AtomicBoolean transportFailed = new AtomicBoolean(false);
171        private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
172        private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
173        private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
174        private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
175        private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
176    
177        // Maps ConsumerIds to ActiveMQConsumer objects
178        private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
179        private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
180        private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
181        private final SessionId connectionSessionId;
182        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
183        private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
184        private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
185        private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
186    
187        private AdvisoryConsumer advisoryConsumer;
188        private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
189        private BrokerInfo brokerInfo;
190        private IOException firstFailureError;
191        private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
192    
193        // Assume that protocol is the latest. Change to the actual protocol
194        // version when a WireFormatInfo is received.
195        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
196        private final long timeCreated;
197        private final ConnectionAudit connectionAudit = new ConnectionAudit();
198        private DestinationSource destinationSource;
199        private final Object ensureConnectionInfoSentMutex = new Object();
200        private boolean useDedicatedTaskRunner;
201        protected volatile CountDownLatch transportInterruptionProcessingComplete;
202        private long consumerFailoverRedeliveryWaitPeriod;
203        private Scheduler scheduler;
204        private boolean messagePrioritySupported = true;
205        private boolean transactedIndividualAck = false;
206        private boolean nonBlockingRedelivery = false;
207    
208        private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
209        private RejectedExecutionHandler rejectedTaskHandler = null;
210    
211        /**
212         * Construct an <code>ActiveMQConnection</code>
213         *
214         * @param transport
215         * @param factoryStats
216         * @throws Exception
217         */
218        protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
219    
220            this.transport = transport;
221            this.clientIdGenerator = clientIdGenerator;
222            this.factoryStats = factoryStats;
223    
224            // Configure a single threaded executor who's core thread can timeout if
225            // idle
226            executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
227                @Override
228                public Thread newThread(Runnable r) {
229                    Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
230                    //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
231                    //thread.setDaemon(true);
232                    return thread;
233                }
234            });
235            // asyncConnectionThread.allowCoreThreadTimeOut(true);
236            String uniqueId = connectionIdGenerator.generateId();
237            this.info = new ConnectionInfo(new ConnectionId(uniqueId));
238            this.info.setManageable(true);
239            this.info.setFaultTolerant(transport.isFaultTolerant());
240            this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
241    
242            this.transport.setTransportListener(this);
243    
244            this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
245            this.factoryStats.addConnection(this);
246            this.timeCreated = System.currentTimeMillis();
247            this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
248        }
249    
250        protected void setUserName(String userName) {
251            this.info.setUserName(userName);
252        }
253    
254        protected void setPassword(String password) {
255            this.info.setPassword(password);
256        }
257    
258        /**
259         * A static helper method to create a new connection
260         *
261         * @return an ActiveMQConnection
262         * @throws JMSException
263         */
264        public static ActiveMQConnection makeConnection() throws JMSException {
265            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
266            return (ActiveMQConnection)factory.createConnection();
267        }
268    
269        /**
270         * A static helper method to create a new connection
271         *
272         * @param uri
273         * @return and ActiveMQConnection
274         * @throws JMSException
275         */
276        public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
277            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
278            return (ActiveMQConnection)factory.createConnection();
279        }
280    
281        /**
282         * A static helper method to create a new connection
283         *
284         * @param user
285         * @param password
286         * @param uri
287         * @return an ActiveMQConnection
288         * @throws JMSException
289         */
290        public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
291            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
292            return (ActiveMQConnection)factory.createConnection();
293        }
294    
295        /**
296         * @return a number unique for this connection
297         */
298        public JMSConnectionStatsImpl getConnectionStats() {
299            return stats;
300        }
301    
302        /**
303         * Creates a <CODE>Session</CODE> object.
304         *
305         * @param transacted indicates whether the session is transacted
306         * @param acknowledgeMode indicates whether the consumer or the client will
307         *                acknowledge any messages it receives; ignored if the
308         *                session is transacted. Legal values are
309         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
310         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
311         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
312         * @return a newly created session
313         * @throws JMSException if the <CODE>Connection</CODE> object fails to
314         *                 create a session due to some internal error or lack of
315         *                 support for the specific transaction and acknowledgement
316         *                 mode.
317         * @see Session#AUTO_ACKNOWLEDGE
318         * @see Session#CLIENT_ACKNOWLEDGE
319         * @see Session#DUPS_OK_ACKNOWLEDGE
320         * @since 1.1
321         */
322        @Override
323        public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
324            checkClosedOrFailed();
325            ensureConnectionInfoSent();
326            if(!transacted) {
327                if (acknowledgeMode==Session.SESSION_TRANSACTED) {
328                    throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
329                } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
330                    throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
331                            "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
332                }
333            }
334            return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
335                ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
336        }
337    
338        /**
339         * @return sessionId
340         */
341        protected SessionId getNextSessionId() {
342            return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
343        }
344    
345        /**
346         * Gets the client identifier for this connection.
347         * <P>
348         * This value is specific to the JMS provider. It is either preconfigured by
349         * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
350         * dynamically by the application by calling the <code>setClientID</code>
351         * method.
352         *
353         * @return the unique client identifier
354         * @throws JMSException if the JMS provider fails to return the client ID
355         *                 for this connection due to some internal error.
356         */
357        @Override
358        public String getClientID() throws JMSException {
359            checkClosedOrFailed();
360            return this.info.getClientId();
361        }
362    
363        /**
364         * Sets the client identifier for this connection.
365         * <P>
366         * The preferred way to assign a JMS client's client identifier is for it to
367         * be configured in a client-specific <CODE>ConnectionFactory</CODE>
368         * object and transparently assigned to the <CODE>Connection</CODE> object
369         * it creates.
370         * <P>
371         * Alternatively, a client can set a connection's client identifier using a
372         * provider-specific value. The facility to set a connection's client
373         * identifier explicitly is not a mechanism for overriding the identifier
374         * that has been administratively configured. It is provided for the case
375         * where no administratively specified identifier exists. If one does exist,
376         * an attempt to change it by setting it must throw an
377         * <CODE>IllegalStateException</CODE>. If a client sets the client
378         * identifier explicitly, it must do so immediately after it creates the
379         * connection and before any other action on the connection is taken. After
380         * this point, setting the client identifier is a programming error that
381         * should throw an <CODE>IllegalStateException</CODE>.
382         * <P>
383         * The purpose of the client identifier is to associate a connection and its
384         * objects with a state maintained on behalf of the client by a provider.
385         * The only such state identified by the JMS API is that required to support
386         * durable subscriptions.
387         * <P>
388         * If another connection with the same <code>clientID</code> is already
389         * running when this method is called, the JMS provider should detect the
390         * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
391         *
392         * @param newClientID the unique client identifier
393         * @throws JMSException if the JMS provider fails to set the client ID for
394         *                 this connection due to some internal error.
395         * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
396         *                 invalid or duplicate client ID.
397         * @throws javax.jms.IllegalStateException if the JMS client attempts to set
398         *                 a connection's client ID at the wrong time or when it has
399         *                 been administratively configured.
400         */
401        @Override
402        public void setClientID(String newClientID) throws JMSException {
403            checkClosedOrFailed();
404    
405            if (this.clientIDSet) {
406                throw new IllegalStateException("The clientID has already been set");
407            }
408    
409            if (this.isConnectionInfoSentToBroker) {
410                throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
411            }
412    
413            this.info.setClientId(newClientID);
414            this.userSpecifiedClientID = true;
415            ensureConnectionInfoSent();
416        }
417    
418        /**
419         * Sets the default client id that the connection will use if explicitly not
420         * set with the setClientId() call.
421         */
422        public void setDefaultClientID(String clientID) throws JMSException {
423            this.info.setClientId(clientID);
424            this.userSpecifiedClientID = true;
425        }
426    
427        /**
428         * Gets the metadata for this connection.
429         *
430         * @return the connection metadata
431         * @throws JMSException if the JMS provider fails to get the connection
432         *                 metadata for this connection.
433         * @see javax.jms.ConnectionMetaData
434         */
435        @Override
436        public ConnectionMetaData getMetaData() throws JMSException {
437            checkClosedOrFailed();
438            return ActiveMQConnectionMetaData.INSTANCE;
439        }
440    
441        /**
442         * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
443         * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
444         * associated with it.
445         *
446         * @return the <CODE>ExceptionListener</CODE> for this connection, or
447         *         null, if no <CODE>ExceptionListener</CODE> is associated with
448         *         this connection.
449         * @throws JMSException if the JMS provider fails to get the
450         *                 <CODE>ExceptionListener</CODE> for this connection.
451         * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
452         */
453        @Override
454        public ExceptionListener getExceptionListener() throws JMSException {
455            checkClosedOrFailed();
456            return this.exceptionListener;
457        }
458    
459        /**
460         * Sets an exception listener for this connection.
461         * <P>
462         * If a JMS provider detects a serious problem with a connection, it informs
463         * the connection's <CODE> ExceptionListener</CODE>, if one has been
464         * registered. It does this by calling the listener's <CODE>onException
465         * </CODE>
466         * method, passing it a <CODE>JMSException</CODE> object describing the
467         * problem.
468         * <P>
469         * An exception listener allows a client to be notified of a problem
470         * asynchronously. Some connections only consume messages, so they would
471         * have no other way to learn their connection has failed.
472         * <P>
473         * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
474         * <P>
475         * A JMS provider should attempt to resolve connection problems itself
476         * before it notifies the client of them.
477         *
478         * @param listener the exception listener
479         * @throws JMSException if the JMS provider fails to set the exception
480         *                 listener for this connection.
481         */
482        @Override
483        public void setExceptionListener(ExceptionListener listener) throws JMSException {
484            checkClosedOrFailed();
485            this.exceptionListener = listener;
486        }
487    
488        /**
489         * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
490         * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
491         * associated with it.
492         *
493         * @return the listener or <code>null</code> if no listener is registered with the connection.
494         */
495        public ClientInternalExceptionListener getClientInternalExceptionListener() {
496            return clientInternalExceptionListener;
497        }
498    
499        /**
500         * Sets a client internal exception listener for this connection.
501         * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
502         * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
503         * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
504         * describing the problem.
505         *
506         * @param listener the exception listener
507         */
508        public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
509            this.clientInternalExceptionListener = listener;
510        }
511    
512        /**
513         * Starts (or restarts) a connection's delivery of incoming messages. A call
514         * to <CODE>start</CODE> on a connection that has already been started is
515         * ignored.
516         *
517         * @throws JMSException if the JMS provider fails to start message delivery
518         *                 due to some internal error.
519         * @see javax.jms.Connection#stop()
520         */
521        @Override
522        public void start() throws JMSException {
523            checkClosedOrFailed();
524            ensureConnectionInfoSent();
525            if (started.compareAndSet(false, true)) {
526                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
527                    ActiveMQSession session = i.next();
528                    session.start();
529                }
530            }
531        }
532    
533        /**
534         * Temporarily stops a connection's delivery of incoming messages. Delivery
535         * can be restarted using the connection's <CODE>start</CODE> method. When
536         * the connection is stopped, delivery to all the connection's message
537         * consumers is inhibited: synchronous receives block, and messages are not
538         * delivered to message listeners.
539         * <P>
540         * This call blocks until receives and/or message listeners in progress have
541         * completed.
542         * <P>
543         * Stopping a connection has no effect on its ability to send messages. A
544         * call to <CODE>stop</CODE> on a connection that has already been stopped
545         * is ignored.
546         * <P>
547         * A call to <CODE>stop</CODE> must not return until delivery of messages
548         * has paused. This means that a client can rely on the fact that none of
549         * its message listeners will be called and that all threads of control
550         * waiting for <CODE>receive</CODE> calls to return will not return with a
551         * message until the connection is restarted. The receive timers for a
552         * stopped connection continue to advance, so receives may time out while
553         * the connection is stopped.
554         * <P>
555         * If message listeners are running when <CODE>stop</CODE> is invoked, the
556         * <CODE>stop</CODE> call must wait until all of them have returned before
557         * it may return. While these message listeners are completing, they must
558         * have the full services of the connection available to them.
559         *
560         * @throws JMSException if the JMS provider fails to stop message delivery
561         *                 due to some internal error.
562         * @see javax.jms.Connection#start()
563         */
564        @Override
565        public void stop() throws JMSException {
566            checkClosedOrFailed();
567            if (started.compareAndSet(true, false)) {
568                synchronized(sessions) {
569                    for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
570                        ActiveMQSession s = i.next();
571                        s.stop();
572                    }
573                }
574            }
575        }
576    
577        /**
578         * Closes the connection.
579         * <P>
580         * Since a provider typically allocates significant resources outside the
581         * JVM on behalf of a connection, clients should close these resources when
582         * they are not needed. Relying on garbage collection to eventually reclaim
583         * these resources may not be timely enough.
584         * <P>
585         * There is no need to close the sessions, producers, and consumers of a
586         * closed connection.
587         * <P>
588         * Closing a connection causes all temporary destinations to be deleted.
589         * <P>
590         * When this method is invoked, it should not return until message
591         * processing has been shut down in an orderly fashion. This means that all
592         * message listeners that may have been running have returned, and that all
593         * pending receives have returned. A close terminates all pending message
594         * receives on the connection's sessions' consumers. The receives may return
595         * with a message or with null, depending on whether there was a message
596         * available at the time of the close. If one or more of the connection's
597         * sessions' message listeners is processing a message at the time when
598         * connection <CODE>close</CODE> is invoked, all the facilities of the
599         * connection and its sessions must remain available to those listeners
600         * until they return control to the JMS provider.
601         * <P>
602         * Closing a connection causes any of its sessions' transactions in progress
603         * to be rolled back. In the case where a session's work is coordinated by
604         * an external transaction manager, a session's <CODE>commit</CODE> and
605         * <CODE> rollback</CODE> methods are not used and the result of a closed
606         * session's work is determined later by the transaction manager. Closing a
607         * connection does NOT force an acknowledgment of client-acknowledged
608         * sessions.
609         * <P>
610         * Invoking the <CODE>acknowledge</CODE> method of a received message from
611         * a closed connection's session must throw an
612         * <CODE>IllegalStateException</CODE>. Closing a closed connection must
613         * NOT throw an exception.
614         *
615         * @throws JMSException if the JMS provider fails to close the connection
616         *                 due to some internal error. For example, a failure to
617         *                 release resources or to close a socket connection can
618         *                 cause this exception to be thrown.
619         */
620        @Override
621        public void close() throws JMSException {
622            // Store the interrupted state and clear so that cleanup happens without
623            // leaking connection resources.  Reset in finally to preserve state.
624            boolean interrupted = Thread.interrupted();
625    
626            try {
627    
628                // If we were running, lets stop first.
629                if (!closed.get() && !transportFailed.get()) {
630                    stop();
631                }
632    
633                synchronized (this) {
634                    if (!closed.get()) {
635                        closing.set(true);
636    
637                        if (destinationSource != null) {
638                            destinationSource.stop();
639                            destinationSource = null;
640                        }
641                        if (advisoryConsumer != null) {
642                            advisoryConsumer.dispose();
643                            advisoryConsumer = null;
644                        }
645    
646                        Scheduler scheduler = this.scheduler;
647                        if (scheduler != null) {
648                            try {
649                                scheduler.stop();
650                            } catch (Exception e) {
651                                JMSException ex =  JMSExceptionSupport.create(e);
652                                throw ex;
653                            }
654                        }
655    
656                        long lastDeliveredSequenceId = 0;
657                        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
658                            ActiveMQSession s = i.next();
659                            s.dispose();
660                            lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
661                        }
662                        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
663                            ActiveMQConnectionConsumer c = i.next();
664                            c.dispose();
665                        }
666                        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
667                            ActiveMQInputStream c = i.next();
668                            c.dispose();
669                        }
670                        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
671                            ActiveMQOutputStream c = i.next();
672                            c.dispose();
673                        }
674    
675                        this.activeTempDestinations.clear();
676    
677                        if (isConnectionInfoSentToBroker) {
678                            // If we announced ourselves to the broker.. Try to let the broker
679                            // know that the connection is being shutdown.
680                            RemoveInfo removeCommand = info.createRemoveCommand();
681                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
682                            doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
683                            doAsyncSendPacket(new ShutdownInfo());
684                        }
685    
686                        started.set(false);
687    
688                        // TODO if we move the TaskRunnerFactory to the connection
689                        // factory
690                        // then we may need to call
691                        // factory.onConnectionClose(this);
692                        if (sessionTaskRunner != null) {
693                            sessionTaskRunner.shutdown();
694                        }
695                        closed.set(true);
696                        closing.set(false);
697                    }
698                }
699            } finally {
700                try {
701                    if (executor != null) {
702                        ThreadPoolUtils.shutdown(executor);
703                    }
704                } catch (Throwable e) {
705                    LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
706                }
707    
708                ServiceSupport.dispose(this.transport);
709    
710                factoryStats.removeConnection(this);
711                if (interrupted) {
712                    Thread.currentThread().interrupt();
713                }
714            }
715        }
716    
717        /**
718         * Tells the broker to terminate its VM. This can be used to cleanly
719         * terminate a broker running in a standalone java process. Server must have
720         * property enable.vm.shutdown=true defined to allow this to work.
721         */
722        // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
723        // implemented.
724        /*
725         * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
726         * command = new BrokerAdminCommand();
727         * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
728         * asyncSendPacket(command); }
729         */
730    
731        /**
732         * Create a durable connection consumer for this connection (optional
733         * operation). This is an expert facility not used by regular JMS clients.
734         *
735         * @param topic topic to access
736         * @param subscriptionName durable subscription name
737         * @param messageSelector only messages with properties matching the message
738         *                selector expression are delivered. A value of null or an
739         *                empty string indicates that there is no message selector
740         *                for the message consumer.
741         * @param sessionPool the server session pool to associate with this durable
742         *                connection consumer
743         * @param maxMessages the maximum number of messages that can be assigned to
744         *                a server session at one time
745         * @return the durable connection consumer
746         * @throws JMSException if the <CODE>Connection</CODE> object fails to
747         *                 create a connection consumer due to some internal error
748         *                 or invalid arguments for <CODE>sessionPool</CODE> and
749         *                 <CODE>messageSelector</CODE>.
750         * @throws javax.jms.InvalidDestinationException if an invalid destination
751         *                 is specified.
752         * @throws javax.jms.InvalidSelectorException if the message selector is
753         *                 invalid.
754         * @see javax.jms.ConnectionConsumer
755         * @since 1.1
756         */
757        @Override
758        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
759            throws JMSException {
760            return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
761        }
762    
763        /**
764         * Create a durable connection consumer for this connection (optional
765         * operation). This is an expert facility not used by regular JMS clients.
766         *
767         * @param topic topic to access
768         * @param subscriptionName durable subscription name
769         * @param messageSelector only messages with properties matching the message
770         *                selector expression are delivered. A value of null or an
771         *                empty string indicates that there is no message selector
772         *                for the message consumer.
773         * @param sessionPool the server session pool to associate with this durable
774         *                connection consumer
775         * @param maxMessages the maximum number of messages that can be assigned to
776         *                a server session at one time
777         * @param noLocal set true if you want to filter out messages published
778         *                locally
779         * @return the durable connection consumer
780         * @throws JMSException if the <CODE>Connection</CODE> object fails to
781         *                 create a connection consumer due to some internal error
782         *                 or invalid arguments for <CODE>sessionPool</CODE> and
783         *                 <CODE>messageSelector</CODE>.
784         * @throws javax.jms.InvalidDestinationException if an invalid destination
785         *                 is specified.
786         * @throws javax.jms.InvalidSelectorException if the message selector is
787         *                 invalid.
788         * @see javax.jms.ConnectionConsumer
789         * @since 1.1
790         */
791        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
792                                                                  boolean noLocal) throws JMSException {
793            checkClosedOrFailed();
794    
795            if (queueOnlyConnection) {
796                throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
797            }
798    
799            ensureConnectionInfoSent();
800            SessionId sessionId = new SessionId(info.getConnectionId(), -1);
801            ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
802            info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
803            info.setSubscriptionName(subscriptionName);
804            info.setSelector(messageSelector);
805            info.setPrefetchSize(maxMessages);
806            info.setDispatchAsync(isDispatchAsync());
807    
808            // Allows the options on the destination to configure the consumerInfo
809            if (info.getDestination().getOptions() != null) {
810                Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
811                IntrospectionSupport.setProperties(this.info, options, "consumer.");
812            }
813    
814            return new ActiveMQConnectionConsumer(this, sessionPool, info);
815        }
816    
817        // Properties
818        // -------------------------------------------------------------------------
819    
820        /**
821         * Returns true if this connection has been started
822         *
823         * @return true if this Connection is started
824         */
825        public boolean isStarted() {
826            return started.get();
827        }
828    
829        /**
830         * Returns true if the connection is closed
831         */
832        public boolean isClosed() {
833            return closed.get();
834        }
835    
836        /**
837         * Returns true if the connection is in the process of being closed
838         */
839        public boolean isClosing() {
840            return closing.get();
841        }
842    
843        /**
844         * Returns true if the underlying transport has failed
845         */
846        public boolean isTransportFailed() {
847            return transportFailed.get();
848        }
849    
850        /**
851         * @return Returns the prefetchPolicy.
852         */
853        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
854            return prefetchPolicy;
855        }
856    
857        /**
858         * Sets the <a
859         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
860         * policy</a> for consumers created by this connection.
861         */
862        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
863            this.prefetchPolicy = prefetchPolicy;
864        }
865    
866        /**
867         */
868        public Transport getTransportChannel() {
869            return transport;
870        }
871    
872        /**
873         * @return Returns the clientID of the connection, forcing one to be
874         *         generated if one has not yet been configured.
875         */
876        public String getInitializedClientID() throws JMSException {
877            ensureConnectionInfoSent();
878            return info.getClientId();
879        }
880    
881        /**
882         * @return Returns the timeStampsDisableByDefault.
883         */
884        public boolean isDisableTimeStampsByDefault() {
885            return disableTimeStampsByDefault;
886        }
887    
888        /**
889         * Sets whether or not timestamps on messages should be disabled or not. If
890         * you disable them it adds a small performance boost.
891         */
892        public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
893            this.disableTimeStampsByDefault = timeStampsDisableByDefault;
894        }
895    
896        /**
897         * @return Returns the dispatchOptimizedMessage.
898         */
899        public boolean isOptimizedMessageDispatch() {
900            return optimizedMessageDispatch;
901        }
902    
903        /**
904         * If this flag is set then an larger prefetch limit is used - only
905         * applicable for durable topic subscribers.
906         */
907        public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
908            this.optimizedMessageDispatch = dispatchOptimizedMessage;
909        }
910    
911        /**
912         * @return Returns the closeTimeout.
913         */
914        public int getCloseTimeout() {
915            return closeTimeout;
916        }
917    
918        /**
919         * Sets the timeout before a close is considered complete. Normally a
920         * close() on a connection waits for confirmation from the broker; this
921         * allows that operation to timeout to save the client hanging if there is
922         * no broker
923         */
924        public void setCloseTimeout(int closeTimeout) {
925            this.closeTimeout = closeTimeout;
926        }
927    
928        /**
929         * @return ConnectionInfo
930         */
931        public ConnectionInfo getConnectionInfo() {
932            return this.info;
933        }
934    
935        public boolean isUseRetroactiveConsumer() {
936            return useRetroactiveConsumer;
937        }
938    
939        /**
940         * Sets whether or not retroactive consumers are enabled. Retroactive
941         * consumers allow non-durable topic subscribers to receive old messages
942         * that were published before the non-durable subscriber started.
943         */
944        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
945            this.useRetroactiveConsumer = useRetroactiveConsumer;
946        }
947    
948        public boolean isNestedMapAndListEnabled() {
949            return nestedMapAndListEnabled;
950        }
951    
952        /**
953         * Enables/disables whether or not Message properties and MapMessage entries
954         * support <a
955         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
956         * Structures</a> of Map and List objects
957         */
958        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
959            this.nestedMapAndListEnabled = structuredMapsEnabled;
960        }
961    
962        public boolean isExclusiveConsumer() {
963            return exclusiveConsumer;
964        }
965    
966        /**
967         * Enables or disables whether or not queue consumers should be exclusive or
968         * not for example to preserve ordering when not using <a
969         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
970         *
971         * @param exclusiveConsumer
972         */
973        public void setExclusiveConsumer(boolean exclusiveConsumer) {
974            this.exclusiveConsumer = exclusiveConsumer;
975        }
976    
977        /**
978         * Adds a transport listener so that a client can be notified of events in
979         * the underlying transport
980         */
981        public void addTransportListener(TransportListener transportListener) {
982            transportListeners.add(transportListener);
983        }
984    
985        public void removeTransportListener(TransportListener transportListener) {
986            transportListeners.remove(transportListener);
987        }
988    
989        public boolean isUseDedicatedTaskRunner() {
990            return useDedicatedTaskRunner;
991        }
992    
993        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
994            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
995        }
996    
997        public TaskRunnerFactory getSessionTaskRunner() {
998            synchronized (this) {
999                if (sessionTaskRunner == null) {
1000                    sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
1001                    sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
1002                }
1003            }
1004            return sessionTaskRunner;
1005        }
1006    
1007        public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1008            this.sessionTaskRunner = sessionTaskRunner;
1009        }
1010    
1011        public MessageTransformer getTransformer() {
1012            return transformer;
1013        }
1014    
1015        /**
1016         * Sets the transformer used to transform messages before they are sent on
1017         * to the JMS bus or when they are received from the bus but before they are
1018         * delivered to the JMS client
1019         */
1020        public void setTransformer(MessageTransformer transformer) {
1021            this.transformer = transformer;
1022        }
1023    
1024        /**
1025         * @return the statsEnabled
1026         */
1027        public boolean isStatsEnabled() {
1028            return this.stats.isEnabled();
1029        }
1030    
1031        /**
1032         * @param statsEnabled the statsEnabled to set
1033         */
1034        public void setStatsEnabled(boolean statsEnabled) {
1035            this.stats.setEnabled(statsEnabled);
1036        }
1037    
1038        /**
1039         * Returns the {@link DestinationSource} object which can be used to listen to destinations
1040         * being created or destroyed or to enquire about the current destinations available on the broker
1041         *
1042         * @return a lazily created destination source
1043         * @throws JMSException
1044         */
1045        @Override
1046        public DestinationSource getDestinationSource() throws JMSException {
1047            if (destinationSource == null) {
1048                destinationSource = new DestinationSource(this);
1049                destinationSource.start();
1050            }
1051            return destinationSource;
1052        }
1053    
1054        // Implementation methods
1055        // -------------------------------------------------------------------------
1056    
1057        /**
1058         * Used internally for adding Sessions to the Connection
1059         *
1060         * @param session
1061         * @throws JMSException
1062         * @throws JMSException
1063         */
1064        protected void addSession(ActiveMQSession session) throws JMSException {
1065            this.sessions.add(session);
1066            if (sessions.size() > 1 || session.isTransacted()) {
1067                optimizedMessageDispatch = false;
1068            }
1069        }
1070    
1071        /**
1072         * Used interanlly for removing Sessions from a Connection
1073         *
1074         * @param session
1075         */
1076        protected void removeSession(ActiveMQSession session) {
1077            this.sessions.remove(session);
1078            this.removeDispatcher(session);
1079        }
1080    
1081        /**
1082         * Add a ConnectionConsumer
1083         *
1084         * @param connectionConsumer
1085         * @throws JMSException
1086         */
1087        protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1088            this.connectionConsumers.add(connectionConsumer);
1089        }
1090    
1091        /**
1092         * Remove a ConnectionConsumer
1093         *
1094         * @param connectionConsumer
1095         */
1096        protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1097            this.connectionConsumers.remove(connectionConsumer);
1098            this.removeDispatcher(connectionConsumer);
1099        }
1100    
1101        /**
1102         * Creates a <CODE>TopicSession</CODE> object.
1103         *
1104         * @param transacted indicates whether the session is transacted
1105         * @param acknowledgeMode indicates whether the consumer or the client will
1106         *                acknowledge any messages it receives; ignored if the
1107         *                session is transacted. Legal values are
1108         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1109         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1110         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1111         * @return a newly created topic session
1112         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1113         *                 to create a session due to some internal error or lack of
1114         *                 support for the specific transaction and acknowledgement
1115         *                 mode.
1116         * @see Session#AUTO_ACKNOWLEDGE
1117         * @see Session#CLIENT_ACKNOWLEDGE
1118         * @see Session#DUPS_OK_ACKNOWLEDGE
1119         */
1120        @Override
1121        public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1122            return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1123        }
1124    
1125        /**
1126         * Creates a connection consumer for this connection (optional operation).
1127         * This is an expert facility not used by regular JMS clients.
1128         *
1129         * @param topic the topic to access
1130         * @param messageSelector only messages with properties matching the message
1131         *                selector expression are delivered. A value of null or an
1132         *                empty string indicates that there is no message selector
1133         *                for the message consumer.
1134         * @param sessionPool the server session pool to associate with this
1135         *                connection consumer
1136         * @param maxMessages the maximum number of messages that can be assigned to
1137         *                a server session at one time
1138         * @return the connection consumer
1139         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1140         *                 to create a connection consumer due to some internal
1141         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1142         *                 and <CODE>messageSelector</CODE>.
1143         * @throws javax.jms.InvalidDestinationException if an invalid topic is
1144         *                 specified.
1145         * @throws javax.jms.InvalidSelectorException if the message selector is
1146         *                 invalid.
1147         * @see javax.jms.ConnectionConsumer
1148         */
1149        @Override
1150        public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1151            return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1152        }
1153    
1154        /**
1155         * Creates a connection consumer for this connection (optional operation).
1156         * This is an expert facility not used by regular JMS clients.
1157         *
1158         * @param queue the queue to access
1159         * @param messageSelector only messages with properties matching the message
1160         *                selector expression are delivered. A value of null or an
1161         *                empty string indicates that there is no message selector
1162         *                for the message consumer.
1163         * @param sessionPool the server session pool to associate with this
1164         *                connection consumer
1165         * @param maxMessages the maximum number of messages that can be assigned to
1166         *                a server session at one time
1167         * @return the connection consumer
1168         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1169         *                 to create a connection consumer due to some internal
1170         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1171         *                 and <CODE>messageSelector</CODE>.
1172         * @throws javax.jms.InvalidDestinationException if an invalid queue is
1173         *                 specified.
1174         * @throws javax.jms.InvalidSelectorException if the message selector is
1175         *                 invalid.
1176         * @see javax.jms.ConnectionConsumer
1177         */
1178        @Override
1179        public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1180            return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1181        }
1182    
1183        /**
1184         * Creates a connection consumer for this connection (optional operation).
1185         * This is an expert facility not used by regular JMS clients.
1186         *
1187         * @param destination the destination to access
1188         * @param messageSelector only messages with properties matching the message
1189         *                selector expression are delivered. A value of null or an
1190         *                empty string indicates that there is no message selector
1191         *                for the message consumer.
1192         * @param sessionPool the server session pool to associate with this
1193         *                connection consumer
1194         * @param maxMessages the maximum number of messages that can be assigned to
1195         *                a server session at one time
1196         * @return the connection consumer
1197         * @throws JMSException if the <CODE>Connection</CODE> object fails to
1198         *                 create a connection consumer due to some internal error
1199         *                 or invalid arguments for <CODE>sessionPool</CODE> and
1200         *                 <CODE>messageSelector</CODE>.
1201         * @throws javax.jms.InvalidDestinationException if an invalid destination
1202         *                 is specified.
1203         * @throws javax.jms.InvalidSelectorException if the message selector is
1204         *                 invalid.
1205         * @see javax.jms.ConnectionConsumer
1206         * @since 1.1
1207         */
1208        @Override
1209        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1210            return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1211        }
1212    
1213        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1214            throws JMSException {
1215    
1216            checkClosedOrFailed();
1217            ensureConnectionInfoSent();
1218    
1219            ConsumerId consumerId = createConsumerId();
1220            ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1221            consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1222            consumerInfo.setSelector(messageSelector);
1223            consumerInfo.setPrefetchSize(maxMessages);
1224            consumerInfo.setNoLocal(noLocal);
1225            consumerInfo.setDispatchAsync(isDispatchAsync());
1226    
1227            // Allows the options on the destination to configure the consumerInfo
1228            if (consumerInfo.getDestination().getOptions() != null) {
1229                Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1230                IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1231            }
1232    
1233            return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1234        }
1235    
1236        /**
1237         * @return
1238         */
1239        private ConsumerId createConsumerId() {
1240            return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1241        }
1242    
1243        /**
1244         * @return
1245         */
1246        private ProducerId createProducerId() {
1247            return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1248        }
1249    
1250        /**
1251         * Creates a <CODE>QueueSession</CODE> object.
1252         *
1253         * @param transacted indicates whether the session is transacted
1254         * @param acknowledgeMode indicates whether the consumer or the client will
1255         *                acknowledge any messages it receives; ignored if the
1256         *                session is transacted. Legal values are
1257         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1258         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1259         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1260         * @return a newly created queue session
1261         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1262         *                 to create a session due to some internal error or lack of
1263         *                 support for the specific transaction and acknowledgement
1264         *                 mode.
1265         * @see Session#AUTO_ACKNOWLEDGE
1266         * @see Session#CLIENT_ACKNOWLEDGE
1267         * @see Session#DUPS_OK_ACKNOWLEDGE
1268         */
1269        @Override
1270        public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1271            return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1272        }
1273    
1274        /**
1275         * Ensures that the clientID was manually specified and not auto-generated.
1276         * If the clientID was not specified this method will throw an exception.
1277         * This method is used to ensure that the clientID + durableSubscriber name
1278         * are used correctly.
1279         *
1280         * @throws JMSException
1281         */
1282        public void checkClientIDWasManuallySpecified() throws JMSException {
1283            if (!userSpecifiedClientID) {
1284                throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1285            }
1286        }
1287    
1288        /**
1289         * send a Packet through the Connection - for internal use only
1290         *
1291         * @param command
1292         * @throws JMSException
1293         */
1294        public void asyncSendPacket(Command command) throws JMSException {
1295            if (isClosed()) {
1296                throw new ConnectionClosedException();
1297            } else {
1298                doAsyncSendPacket(command);
1299            }
1300        }
1301    
1302        private void doAsyncSendPacket(Command command) throws JMSException {
1303            try {
1304                this.transport.oneway(command);
1305            } catch (IOException e) {
1306                throw JMSExceptionSupport.create(e);
1307            }
1308        }
1309    
1310        /**
1311         * Send a packet through a Connection - for internal use only
1312         *
1313         * @param command
1314         * @return
1315         * @throws JMSException
1316         */
1317        public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1318            if(onComplete==null) {
1319                syncSendPacket(command);
1320            } else {
1321                if (isClosed()) {
1322                    throw new ConnectionClosedException();
1323                }
1324                try {
1325                    this.transport.asyncRequest(command, new ResponseCallback() {
1326                        @Override
1327                        public void onCompletion(FutureResponse resp) {
1328                            Response response;
1329                            Throwable exception = null;
1330                            try {
1331                                response = resp.getResult();
1332                                if (response.isException()) {
1333                                    ExceptionResponse er = (ExceptionResponse)response;
1334                                    exception = er.getException();
1335                                }
1336                            } catch (Exception e) {
1337                                exception = e;
1338                            }
1339                            if(exception!=null) {
1340                                if ( exception instanceof JMSException) {
1341                                    onComplete.onException((JMSException) exception);
1342                                } else {
1343                                    if (isClosed()||closing.get()) {
1344                                        LOG.debug("Received an exception but connection is closing");
1345                                    }
1346                                    JMSException jmsEx = null;
1347                                    try {
1348                                        jmsEx = JMSExceptionSupport.create(exception);
1349                                    } catch(Throwable e) {
1350                                        LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1351                                    }
1352                                    // dispose of transport for security exceptions on connection initiation
1353                                    if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1354                                        Transport t = transport;
1355                                        if (null != t){
1356                                            ServiceSupport.dispose(t);
1357                                        }
1358                                    }
1359                                    if (jmsEx !=null) {
1360                                        onComplete.onException(jmsEx);
1361                                    }
1362                                }
1363                            } else {
1364                                onComplete.onSuccess();
1365                            }
1366                        }
1367                    });
1368                } catch (IOException e) {
1369                    throw JMSExceptionSupport.create(e);
1370                }
1371            }
1372        }
1373    
1374        public Response syncSendPacket(Command command) throws JMSException {
1375            if (isClosed()) {
1376                throw new ConnectionClosedException();
1377            } else {
1378    
1379                try {
1380                    Response response = (Response)this.transport.request(command);
1381                    if (response.isException()) {
1382                        ExceptionResponse er = (ExceptionResponse)response;
1383                        if (er.getException() instanceof JMSException) {
1384                            throw (JMSException)er.getException();
1385                        } else {
1386                            if (isClosed()||closing.get()) {
1387                                LOG.debug("Received an exception but connection is closing");
1388                            }
1389                            JMSException jmsEx = null;
1390                            try {
1391                                jmsEx = JMSExceptionSupport.create(er.getException());
1392                            } catch(Throwable e) {
1393                                LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1394                            }
1395                            //dispose of transport for security exceptions
1396                            if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1397                                Transport t = this.transport;
1398                                if (null != t){
1399                                    ServiceSupport.dispose(t);
1400                                }
1401                            }
1402                            if (jmsEx !=null) {
1403                                throw jmsEx;
1404                            }
1405                        }
1406                    }
1407                    return response;
1408                } catch (IOException e) {
1409                    throw JMSExceptionSupport.create(e);
1410                }
1411            }
1412        }
1413    
1414        /**
1415         * Send a packet through a Connection - for internal use only
1416         *
1417         * @param command
1418         * @return
1419         * @throws JMSException
1420         */
1421        public Response syncSendPacket(Command command, int timeout) throws JMSException {
1422            if (isClosed() || closing.get()) {
1423                throw new ConnectionClosedException();
1424            } else {
1425                return doSyncSendPacket(command, timeout);
1426            }
1427        }
1428    
1429        private Response doSyncSendPacket(Command command, int timeout)
1430                throws JMSException {
1431            try {
1432                Response response = (Response) (timeout > 0
1433                        ? this.transport.request(command, timeout)
1434                        : this.transport.request(command));
1435                if (response != null && response.isException()) {
1436                    ExceptionResponse er = (ExceptionResponse)response;
1437                    if (er.getException() instanceof JMSException) {
1438                        throw (JMSException)er.getException();
1439                    } else {
1440                        throw JMSExceptionSupport.create(er.getException());
1441                    }
1442                }
1443                return response;
1444            } catch (IOException e) {
1445                throw JMSExceptionSupport.create(e);
1446            }
1447        }
1448    
1449        /**
1450         * @return statistics for this Connection
1451         */
1452        @Override
1453        public StatsImpl getStats() {
1454            return stats;
1455        }
1456    
1457        /**
1458         * simply throws an exception if the Connection is already closed or the
1459         * Transport has failed
1460         *
1461         * @throws JMSException
1462         */
1463        protected synchronized void checkClosedOrFailed() throws JMSException {
1464            checkClosed();
1465            if (transportFailed.get()) {
1466                throw new ConnectionFailedException(firstFailureError);
1467            }
1468        }
1469    
1470        /**
1471         * simply throws an exception if the Connection is already closed
1472         *
1473         * @throws JMSException
1474         */
1475        protected synchronized void checkClosed() throws JMSException {
1476            if (closed.get()) {
1477                throw new ConnectionClosedException();
1478            }
1479        }
1480    
1481        /**
1482         * Send the ConnectionInfo to the Broker
1483         *
1484         * @throws JMSException
1485         */
1486        protected void ensureConnectionInfoSent() throws JMSException {
1487            synchronized(this.ensureConnectionInfoSentMutex) {
1488                // Can we skip sending the ConnectionInfo packet??
1489                if (isConnectionInfoSentToBroker || closed.get()) {
1490                    return;
1491                }
1492                //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1493                if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1494                    info.setClientId(clientIdGenerator.generateId());
1495                }
1496                syncSendPacket(info.copy());
1497    
1498                this.isConnectionInfoSentToBroker = true;
1499                // Add a temp destination advisory consumer so that
1500                // We know what the valid temporary destinations are on the
1501                // broker without having to do an RPC to the broker.
1502    
1503                ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1504                if (watchTopicAdvisories) {
1505                    advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1506                }
1507            }
1508        }
1509    
1510        public synchronized boolean isWatchTopicAdvisories() {
1511            return watchTopicAdvisories;
1512        }
1513    
1514        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1515            this.watchTopicAdvisories = watchTopicAdvisories;
1516        }
1517    
1518        /**
1519         * @return Returns the useAsyncSend.
1520         */
1521        public boolean isUseAsyncSend() {
1522            return useAsyncSend;
1523        }
1524    
1525        /**
1526         * Forces the use of <a
1527         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1528         * adds a massive performance boost; but means that the send() method will
1529         * return immediately whether the message has been sent or not which could
1530         * lead to message loss.
1531         */
1532        public void setUseAsyncSend(boolean useAsyncSend) {
1533            this.useAsyncSend = useAsyncSend;
1534        }
1535    
1536        /**
1537         * @return true if always sync send messages
1538         */
1539        public boolean isAlwaysSyncSend() {
1540            return this.alwaysSyncSend;
1541        }
1542    
1543        /**
1544         * Set true if always require messages to be sync sent
1545         *
1546         * @param alwaysSyncSend
1547         */
1548        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1549            this.alwaysSyncSend = alwaysSyncSend;
1550        }
1551    
1552        /**
1553         * @return the messagePrioritySupported
1554         */
1555        public boolean isMessagePrioritySupported() {
1556            return this.messagePrioritySupported;
1557        }
1558    
1559        /**
1560         * @param messagePrioritySupported the messagePrioritySupported to set
1561         */
1562        public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1563            this.messagePrioritySupported = messagePrioritySupported;
1564        }
1565    
1566        /**
1567         * Cleans up this connection so that it's state is as if the connection was
1568         * just created. This allows the Resource Adapter to clean up a connection
1569         * so that it can be reused without having to close and recreate the
1570         * connection.
1571         */
1572        public void cleanup() throws JMSException {
1573    
1574            if (advisoryConsumer != null && !isTransportFailed()) {
1575                advisoryConsumer.dispose();
1576                advisoryConsumer = null;
1577            }
1578    
1579            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1580                ActiveMQSession s = i.next();
1581                s.dispose();
1582            }
1583            for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1584                ActiveMQConnectionConsumer c = i.next();
1585                c.dispose();
1586            }
1587            for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1588                ActiveMQInputStream c = i.next();
1589                c.dispose();
1590            }
1591            for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1592                ActiveMQOutputStream c = i.next();
1593                c.dispose();
1594            }
1595    
1596            if (isConnectionInfoSentToBroker) {
1597                if (!transportFailed.get() && !closing.get()) {
1598                    syncSendPacket(info.createRemoveCommand());
1599                }
1600                isConnectionInfoSentToBroker = false;
1601            }
1602            if (userSpecifiedClientID) {
1603                info.setClientId(null);
1604                userSpecifiedClientID = false;
1605            }
1606            clientIDSet = false;
1607    
1608            started.set(false);
1609        }
1610    
1611        @Override
1612        public void finalize() throws Throwable{
1613            Scheduler s = this.scheduler;
1614            if (s != null){
1615                s.stop();
1616            }
1617        }
1618    
1619        /**
1620         * Changes the associated username/password that is associated with this
1621         * connection. If the connection has been used, you must called cleanup()
1622         * before calling this method.
1623         *
1624         * @throws IllegalStateException if the connection is in used.
1625         */
1626        public void changeUserInfo(String userName, String password) throws JMSException {
1627            if (isConnectionInfoSentToBroker) {
1628                throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1629            }
1630            this.info.setUserName(userName);
1631            this.info.setPassword(password);
1632        }
1633    
1634        /**
1635         * @return Returns the resourceManagerId.
1636         * @throws JMSException
1637         */
1638        public String getResourceManagerId() throws JMSException {
1639            waitForBrokerInfo();
1640            if (brokerInfo == null) {
1641                throw new JMSException("Connection failed before Broker info was received.");
1642            }
1643            return brokerInfo.getBrokerId().getValue();
1644        }
1645    
1646        /**
1647         * Returns the broker name if one is available or null if one is not
1648         * available yet.
1649         */
1650        public String getBrokerName() {
1651            try {
1652                brokerInfoReceived.await(5, TimeUnit.SECONDS);
1653                if (brokerInfo == null) {
1654                    return null;
1655                }
1656                return brokerInfo.getBrokerName();
1657            } catch (InterruptedException e) {
1658                Thread.currentThread().interrupt();
1659                return null;
1660            }
1661        }
1662    
1663        /**
1664         * Returns the broker information if it is available or null if it is not
1665         * available yet.
1666         */
1667        public BrokerInfo getBrokerInfo() {
1668            return brokerInfo;
1669        }
1670    
1671        /**
1672         * @return Returns the RedeliveryPolicy.
1673         * @throws JMSException
1674         */
1675        public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1676            return redeliveryPolicyMap.getDefaultEntry();
1677        }
1678    
1679        /**
1680         * Sets the redelivery policy to be used when messages are rolled back
1681         */
1682        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1683            this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1684        }
1685    
1686        public BlobTransferPolicy getBlobTransferPolicy() {
1687            if (blobTransferPolicy == null) {
1688                blobTransferPolicy = createBlobTransferPolicy();
1689            }
1690            return blobTransferPolicy;
1691        }
1692    
1693        /**
1694         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1695         * OBjects) are transferred from producers to brokers to consumers
1696         */
1697        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1698            this.blobTransferPolicy = blobTransferPolicy;
1699        }
1700    
1701        /**
1702         * @return Returns the alwaysSessionAsync.
1703         */
1704        public boolean isAlwaysSessionAsync() {
1705            return alwaysSessionAsync;
1706        }
1707    
1708        /**
1709         * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1710         * the Connection. However, a separate thread is always used if there is more than one session, or the session
1711         * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
1712         * happens asynchronously.
1713         */
1714        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1715            this.alwaysSessionAsync = alwaysSessionAsync;
1716        }
1717    
1718        /**
1719         * @return Returns the optimizeAcknowledge.
1720         */
1721        public boolean isOptimizeAcknowledge() {
1722            return optimizeAcknowledge;
1723        }
1724    
1725        /**
1726         * Enables an optimised acknowledgement mode where messages are acknowledged
1727         * in batches rather than individually
1728         *
1729         * @param optimizeAcknowledge The optimizeAcknowledge to set.
1730         */
1731        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1732            this.optimizeAcknowledge = optimizeAcknowledge;
1733        }
1734    
1735        /**
1736         * The max time in milliseconds between optimized ack batches
1737         * @param optimizeAcknowledgeTimeOut
1738         */
1739        public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1740            this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1741        }
1742    
1743        public long getOptimizeAcknowledgeTimeOut() {
1744            return optimizeAcknowledgeTimeOut;
1745        }
1746    
1747        public long getWarnAboutUnstartedConnectionTimeout() {
1748            return warnAboutUnstartedConnectionTimeout;
1749        }
1750    
1751        /**
1752         * Enables the timeout from a connection creation to when a warning is
1753         * generated if the connection is not properly started via {@link #start()}
1754         * and a message is received by a consumer. It is a very common gotcha to
1755         * forget to <a
1756         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1757         * the connection</a> so this option makes the default case to create a
1758         * warning if the user forgets. To disable the warning just set the value to <
1759         * 0 (say -1).
1760         */
1761        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1762            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1763        }
1764    
1765        /**
1766         * @return the sendTimeout
1767         */
1768        public int getSendTimeout() {
1769            return sendTimeout;
1770        }
1771    
1772        /**
1773         * @param sendTimeout the sendTimeout to set
1774         */
1775        public void setSendTimeout(int sendTimeout) {
1776            this.sendTimeout = sendTimeout;
1777        }
1778    
1779        /**
1780         * @return the sendAcksAsync
1781         */
1782        public boolean isSendAcksAsync() {
1783            return sendAcksAsync;
1784        }
1785    
1786        /**
1787         * @param sendAcksAsync the sendAcksAsync to set
1788         */
1789        public void setSendAcksAsync(boolean sendAcksAsync) {
1790            this.sendAcksAsync = sendAcksAsync;
1791        }
1792    
1793        /**
1794         * Returns the time this connection was created
1795         */
1796        public long getTimeCreated() {
1797            return timeCreated;
1798        }
1799    
1800        private void waitForBrokerInfo() throws JMSException {
1801            try {
1802                brokerInfoReceived.await();
1803            } catch (InterruptedException e) {
1804                Thread.currentThread().interrupt();
1805                throw JMSExceptionSupport.create(e);
1806            }
1807        }
1808    
1809        // Package protected so that it can be used in unit tests
1810        public Transport getTransport() {
1811            return transport;
1812        }
1813    
1814        public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1815            producers.put(producerId, producer);
1816        }
1817    
1818        public void removeProducer(ProducerId producerId) {
1819            producers.remove(producerId);
1820        }
1821    
1822        public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1823            dispatchers.put(consumerId, dispatcher);
1824        }
1825    
1826        public void removeDispatcher(ConsumerId consumerId) {
1827            dispatchers.remove(consumerId);
1828        }
1829    
1830        /**
1831         * @param o - the command to consume
1832         */
1833        @Override
1834        public void onCommand(final Object o) {
1835            final Command command = (Command)o;
1836            if (!closed.get() && command != null) {
1837                try {
1838                    command.visit(new CommandVisitorAdapter() {
1839                        @Override
1840                        public Response processMessageDispatch(MessageDispatch md) throws Exception {
1841                            waitForTransportInterruptionProcessingToComplete();
1842                            ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1843                            if (dispatcher != null) {
1844                                // Copy in case a embedded broker is dispatching via
1845                                // vm://
1846                                // md.getMessage() == null to signal end of queue
1847                                // browse.
1848                                Message msg = md.getMessage();
1849                                if (msg != null) {
1850                                    msg = msg.copy();
1851                                    msg.setReadOnlyBody(true);
1852                                    msg.setReadOnlyProperties(true);
1853                                    msg.setRedeliveryCounter(md.getRedeliveryCounter());
1854                                    msg.setConnection(ActiveMQConnection.this);
1855                                    msg.setMemoryUsage(null);
1856                                    md.setMessage(msg);
1857                                }
1858                                dispatcher.dispatch(md);
1859                            }
1860                            return null;
1861                        }
1862    
1863                        @Override
1864                        public Response processProducerAck(ProducerAck pa) throws Exception {
1865                            if (pa != null && pa.getProducerId() != null) {
1866                                ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1867                                if (producer != null) {
1868                                    producer.onProducerAck(pa);
1869                                }
1870                            }
1871                            return null;
1872                        }
1873    
1874                        @Override
1875                        public Response processBrokerInfo(BrokerInfo info) throws Exception {
1876                            brokerInfo = info;
1877                            brokerInfoReceived.countDown();
1878                            optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1879                            getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1880                            return null;
1881                        }
1882    
1883                        @Override
1884                        public Response processConnectionError(final ConnectionError error) throws Exception {
1885                            executor.execute(new Runnable() {
1886                                @Override
1887                                public void run() {
1888                                    onAsyncException(error.getException());
1889                                }
1890                            });
1891                            return null;
1892                        }
1893    
1894                        @Override
1895                        public Response processControlCommand(ControlCommand command) throws Exception {
1896                            onControlCommand(command);
1897                            return null;
1898                        }
1899    
1900                        @Override
1901                        public Response processConnectionControl(ConnectionControl control) throws Exception {
1902                            onConnectionControl((ConnectionControl)command);
1903                            return null;
1904                        }
1905    
1906                        @Override
1907                        public Response processConsumerControl(ConsumerControl control) throws Exception {
1908                            onConsumerControl((ConsumerControl)command);
1909                            return null;
1910                        }
1911    
1912                        @Override
1913                        public Response processWireFormat(WireFormatInfo info) throws Exception {
1914                            onWireFormatInfo((WireFormatInfo)command);
1915                            return null;
1916                        }
1917                    });
1918                } catch (Exception e) {
1919                    onClientInternalException(e);
1920                }
1921            }
1922    
1923            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1924                TransportListener listener = iter.next();
1925                listener.onCommand(command);
1926            }
1927        }
1928    
1929        protected void onWireFormatInfo(WireFormatInfo info) {
1930            protocolVersion.set(info.getVersion());
1931        }
1932    
1933        /**
1934         * Handles async client internal exceptions.
1935         * A client internal exception is usually one that has been thrown
1936         * by a container runtime component during asynchronous processing of a
1937         * message that does not affect the connection itself.
1938         * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1939         * its <code>onException</code> method, if one has been registered with this connection.
1940         *
1941         * @param error the exception that the problem
1942         */
1943        public void onClientInternalException(final Throwable error) {
1944            if ( !closed.get() && !closing.get() ) {
1945                if ( this.clientInternalExceptionListener != null ) {
1946                    executor.execute(new Runnable() {
1947                        @Override
1948                        public void run() {
1949                            ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1950                        }
1951                    });
1952                } else {
1953                    LOG.debug("Async client internal exception occurred with no exception listener registered: "
1954                            + error, error);
1955                }
1956            }
1957        }
1958    
1959        /**
1960         * Used for handling async exceptions
1961         *
1962         * @param error
1963         */
1964        public void onAsyncException(Throwable error) {
1965            if (!closed.get() && !closing.get()) {
1966                if (this.exceptionListener != null) {
1967    
1968                    if (!(error instanceof JMSException)) {
1969                        error = JMSExceptionSupport.create(error);
1970                    }
1971                    final JMSException e = (JMSException)error;
1972    
1973                    executor.execute(new Runnable() {
1974                        @Override
1975                        public void run() {
1976                            ActiveMQConnection.this.exceptionListener.onException(e);
1977                        }
1978                    });
1979    
1980                } else {
1981                    LOG.debug("Async exception with no exception listener: " + error, error);
1982                }
1983            }
1984        }
1985    
1986        @Override
1987        public void onException(final IOException error) {
1988            onAsyncException(error);
1989            if (!closing.get() && !closed.get()) {
1990                executor.execute(new Runnable() {
1991                    @Override
1992                    public void run() {
1993                        transportFailed(error);
1994                        ServiceSupport.dispose(ActiveMQConnection.this.transport);
1995                        brokerInfoReceived.countDown();
1996                        try {
1997                            cleanup();
1998                        } catch (JMSException e) {
1999                            LOG.warn("Exception during connection cleanup, " + e, e);
2000                        }
2001                        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2002                            TransportListener listener = iter.next();
2003                            listener.onException(error);
2004                        }
2005                    }
2006                });
2007            }
2008        }
2009    
2010        @Override
2011        public void transportInterupted() {
2012            this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
2013            if (LOG.isDebugEnabled()) {
2014                LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
2015            }
2016            signalInterruptionProcessingNeeded();
2017    
2018            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2019                ActiveMQSession s = i.next();
2020                s.clearMessagesInProgress();
2021            }
2022    
2023            for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
2024                connectionConsumer.clearMessagesInProgress();
2025            }
2026    
2027            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2028                TransportListener listener = iter.next();
2029                listener.transportInterupted();
2030            }
2031        }
2032    
2033        @Override
2034        public void transportResumed() {
2035            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2036                TransportListener listener = iter.next();
2037                listener.transportResumed();
2038            }
2039        }
2040    
2041        /**
2042         * Create the DestinationInfo object for the temporary destination.
2043         *
2044         * @param topic - if its true topic, else queue.
2045         * @return DestinationInfo
2046         * @throws JMSException
2047         */
2048        protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2049    
2050            // Check if Destination info is of temporary type.
2051            ActiveMQTempDestination dest;
2052            if (topic) {
2053                dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2054            } else {
2055                dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2056            }
2057    
2058            DestinationInfo info = new DestinationInfo();
2059            info.setConnectionId(this.info.getConnectionId());
2060            info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2061            info.setDestination(dest);
2062            syncSendPacket(info);
2063    
2064            dest.setConnection(this);
2065            activeTempDestinations.put(dest, dest);
2066            return dest;
2067        }
2068    
2069        /**
2070         * @param destination
2071         * @throws JMSException
2072         */
2073        public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2074    
2075            checkClosedOrFailed();
2076    
2077            for (ActiveMQSession session : this.sessions) {
2078                if (session.isInUse(destination)) {
2079                    throw new JMSException("A consumer is consuming from the temporary destination");
2080                }
2081            }
2082    
2083            activeTempDestinations.remove(destination);
2084    
2085            DestinationInfo destInfo = new DestinationInfo();
2086            destInfo.setConnectionId(this.info.getConnectionId());
2087            destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2088            destInfo.setDestination(destination);
2089            destInfo.setTimeout(0);
2090            syncSendPacket(destInfo);
2091        }
2092    
2093        public boolean isDeleted(ActiveMQDestination dest) {
2094    
2095            // If we are not watching the advisories.. then
2096            // we will assume that the temp destination does exist.
2097            if (advisoryConsumer == null) {
2098                return false;
2099            }
2100    
2101            return !activeTempDestinations.contains(dest);
2102        }
2103    
2104        public boolean isCopyMessageOnSend() {
2105            return copyMessageOnSend;
2106        }
2107    
2108        public LongSequenceGenerator getLocalTransactionIdGenerator() {
2109            return localTransactionIdGenerator;
2110        }
2111    
2112        public boolean isUseCompression() {
2113            return useCompression;
2114        }
2115    
2116        /**
2117         * Enables the use of compression of the message bodies
2118         */
2119        public void setUseCompression(boolean useCompression) {
2120            this.useCompression = useCompression;
2121        }
2122    
2123        public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2124    
2125            checkClosedOrFailed();
2126            ensureConnectionInfoSent();
2127    
2128            DestinationInfo info = new DestinationInfo();
2129            info.setConnectionId(this.info.getConnectionId());
2130            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2131            info.setDestination(destination);
2132            info.setTimeout(0);
2133            syncSendPacket(info);
2134        }
2135    
2136        public boolean isDispatchAsync() {
2137            return dispatchAsync;
2138        }
2139    
2140        /**
2141         * Enables or disables the default setting of whether or not consumers have
2142         * their messages <a
2143         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2144         * synchronously or asynchronously by the broker</a>. For non-durable
2145         * topics for example we typically dispatch synchronously by default to
2146         * minimize context switches which boost performance. However sometimes its
2147         * better to go slower to ensure that a single blocked consumer socket does
2148         * not block delivery to other consumers.
2149         *
2150         * @param asyncDispatch If true then consumers created on this connection
2151         *                will default to having their messages dispatched
2152         *                asynchronously. The default value is true.
2153         */
2154        public void setDispatchAsync(boolean asyncDispatch) {
2155            this.dispatchAsync = asyncDispatch;
2156        }
2157    
2158        public boolean isObjectMessageSerializationDefered() {
2159            return objectMessageSerializationDefered;
2160        }
2161    
2162        /**
2163         * When an object is set on an ObjectMessage, the JMS spec requires the
2164         * object to be serialized by that set method. Enabling this flag causes the
2165         * object to not get serialized. The object may subsequently get serialized
2166         * if the message needs to be sent over a socket or stored to disk.
2167         */
2168        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2169            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2170        }
2171    
2172        @Override
2173        public InputStream createInputStream(Destination dest) throws JMSException {
2174            return createInputStream(dest, null);
2175        }
2176    
2177        @Override
2178        public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2179            return createInputStream(dest, messageSelector, false);
2180        }
2181    
2182        @Override
2183        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2184            return createInputStream(dest, messageSelector, noLocal,  -1);
2185        }
2186    
2187        @Override
2188        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2189            return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2190        }
2191    
2192        @Override
2193        public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2194            return createInputStream(dest, null, false);
2195        }
2196    
2197        @Override
2198        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2199            return createDurableInputStream(dest, name, messageSelector, false);
2200        }
2201    
2202        @Override
2203        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2204            return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2205        }
2206    
2207        @Override
2208        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2209            return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2210        }
2211    
2212        private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2213            checkClosedOrFailed();
2214            ensureConnectionInfoSent();
2215            return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2216        }
2217    
2218        /**
2219         * Creates a persistent output stream; individual messages will be written
2220         * to disk/database by the broker
2221         */
2222        @Override
2223        public OutputStream createOutputStream(Destination dest) throws JMSException {
2224            return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2225        }
2226    
2227        /**
2228         * Creates a non persistent output stream; messages will not be written to
2229         * disk
2230         */
2231        public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2232            return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2233        }
2234    
2235        /**
2236         * Creates an output stream allowing full control over the delivery mode,
2237         * the priority and time to live of the messages and the properties added to
2238         * messages on the stream.
2239         *
2240         * @param streamProperties defines a map of key-value pairs where the keys
2241         *                are strings and the values are primitive values (numbers
2242         *                and strings) which are appended to the messages similarly
2243         *                to using the
2244         *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2245         *                method
2246         */
2247        @Override
2248        public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2249            checkClosedOrFailed();
2250            ensureConnectionInfoSent();
2251            return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2252        }
2253    
2254        /**
2255         * Unsubscribes a durable subscription that has been created by a client.
2256         * <P>
2257         * This method deletes the state being maintained on behalf of the
2258         * subscriber by its provider.
2259         * <P>
2260         * It is erroneous for a client to delete a durable subscription while there
2261         * is an active <CODE>MessageConsumer </CODE> or
2262         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2263         * message is part of a pending transaction or has not been acknowledged in
2264         * the session.
2265         *
2266         * @param name the name used to identify this subscription
2267         * @throws JMSException if the session fails to unsubscribe to the durable
2268         *                 subscription due to some internal error.
2269         * @throws InvalidDestinationException if an invalid subscription name is
2270         *                 specified.
2271         * @since 1.1
2272         */
2273        @Override
2274        public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2275            checkClosedOrFailed();
2276            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2277            rsi.setConnectionId(getConnectionInfo().getConnectionId());
2278            rsi.setSubscriptionName(name);
2279            rsi.setClientId(getConnectionInfo().getClientId());
2280            syncSendPacket(rsi);
2281        }
2282    
2283        /**
2284         * Internal send method optimized: - It does not copy the message - It can
2285         * only handle ActiveMQ messages. - You can specify if the send is async or
2286         * sync - Does not allow you to send /w a transaction.
2287         */
2288        void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2289            checkClosedOrFailed();
2290    
2291            if (destination.isTemporary() && isDeleted(destination)) {
2292                throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2293            }
2294    
2295            msg.setJMSDestination(destination);
2296            msg.setJMSDeliveryMode(deliveryMode);
2297            long expiration = 0L;
2298    
2299            if (!isDisableTimeStampsByDefault()) {
2300                long timeStamp = System.currentTimeMillis();
2301                msg.setJMSTimestamp(timeStamp);
2302                if (timeToLive > 0) {
2303                    expiration = timeToLive + timeStamp;
2304                }
2305            }
2306    
2307            msg.setJMSExpiration(expiration);
2308            msg.setJMSPriority(priority);
2309            msg.setJMSRedelivered(false);
2310            msg.setMessageId(messageId);
2311            msg.onSend();
2312            msg.setProducerId(msg.getMessageId().getProducerId());
2313    
2314            if (LOG.isDebugEnabled()) {
2315                LOG.debug("Sending message: " + msg);
2316            }
2317    
2318            if (async) {
2319                asyncSendPacket(msg);
2320            } else {
2321                syncSendPacket(msg);
2322            }
2323        }
2324    
2325        public void addOutputStream(ActiveMQOutputStream stream) {
2326            outputStreams.add(stream);
2327        }
2328    
2329        public void removeOutputStream(ActiveMQOutputStream stream) {
2330            outputStreams.remove(stream);
2331        }
2332    
2333        public void addInputStream(ActiveMQInputStream stream) {
2334            inputStreams.add(stream);
2335        }
2336    
2337        public void removeInputStream(ActiveMQInputStream stream) {
2338            inputStreams.remove(stream);
2339        }
2340    
2341        protected void onControlCommand(ControlCommand command) {
2342            String text = command.getCommand();
2343            if (text != null) {
2344                if ("shutdown".equals(text)) {
2345                    LOG.info("JVM told to shutdown");
2346                    System.exit(0);
2347                }
2348    
2349                // TODO Should we handle the "close" case?
2350                // if (false && "close".equals(text)){
2351                //     LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2352                //     try {
2353                //         close();
2354                //     } catch (JMSException e) {
2355                //     }
2356                // }
2357            }
2358        }
2359    
2360        protected void onConnectionControl(ConnectionControl command) {
2361            if (command.isFaultTolerant()) {
2362                this.optimizeAcknowledge = false;
2363                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2364                    ActiveMQSession s = i.next();
2365                    s.setOptimizeAcknowledge(false);
2366                }
2367            }
2368        }
2369    
2370        protected void onConsumerControl(ConsumerControl command) {
2371            if (command.isClose()) {
2372                for (ActiveMQSession session : this.sessions) {
2373                    session.close(command.getConsumerId());
2374                }
2375            } else {
2376                for (ActiveMQSession session : this.sessions) {
2377                    session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2378                }
2379                for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
2380                    ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
2381                    if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
2382                        consumerInfo.setPrefetchSize(command.getPrefetch());
2383                    }
2384                }
2385            }
2386        }
2387    
2388        protected void transportFailed(IOException error) {
2389            transportFailed.set(true);
2390            if (firstFailureError == null) {
2391                firstFailureError = error;
2392            }
2393        }
2394    
2395        /**
2396         * Should a JMS message be copied to a new JMS Message object as part of the
2397         * send() method in JMS. This is enabled by default to be compliant with the
2398         * JMS specification. You can disable it if you do not mutate JMS messages
2399         * after they are sent for a performance boost
2400         */
2401        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2402            this.copyMessageOnSend = copyMessageOnSend;
2403        }
2404    
2405        @Override
2406        public String toString() {
2407            return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2408        }
2409    
2410        protected BlobTransferPolicy createBlobTransferPolicy() {
2411            return new BlobTransferPolicy();
2412        }
2413    
2414        public int getProtocolVersion() {
2415            return protocolVersion.get();
2416        }
2417    
2418        public int getProducerWindowSize() {
2419            return producerWindowSize;
2420        }
2421    
2422        public void setProducerWindowSize(int producerWindowSize) {
2423            this.producerWindowSize = producerWindowSize;
2424        }
2425    
2426        public void setAuditDepth(int auditDepth) {
2427            connectionAudit.setAuditDepth(auditDepth);
2428        }
2429    
2430        public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2431            connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2432        }
2433    
2434        protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2435            connectionAudit.removeDispatcher(dispatcher);
2436        }
2437    
2438        protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2439            return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2440        }
2441    
2442        protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2443            connectionAudit.rollbackDuplicate(dispatcher, message);
2444        }
2445    
2446        public IOException getFirstFailureError() {
2447            return firstFailureError;
2448        }
2449    
2450        protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2451            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2452            if (cdl != null) {
2453                if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2454                    LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2455                    cdl.await(10, TimeUnit.SECONDS);
2456                }
2457                signalInterruptionProcessingComplete();
2458            }
2459        }
2460    
2461        protected void transportInterruptionProcessingComplete() {
2462            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2463            if (cdl != null) {
2464                cdl.countDown();
2465                try {
2466                    signalInterruptionProcessingComplete();
2467                } catch (InterruptedException ignored) {}
2468            }
2469        }
2470    
2471        private void signalInterruptionProcessingComplete() throws InterruptedException {
2472            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2473            if (cdl.getCount()==0) {
2474                if (LOG.isDebugEnabled()) {
2475                    LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2476                }
2477                this.transportInterruptionProcessingComplete = null;
2478    
2479                FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2480                if (failoverTransport != null) {
2481                    failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2482                    if (LOG.isDebugEnabled()) {
2483                        LOG.debug("notified failover transport (" + failoverTransport
2484                                + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2485                    }
2486                }
2487    
2488            }
2489        }
2490    
2491        private void signalInterruptionProcessingNeeded() {
2492            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2493            if (failoverTransport != null) {
2494                failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2495                if (LOG.isDebugEnabled()) {
2496                    LOG.debug("notified failover transport (" + failoverTransport
2497                            + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2498                }
2499            }
2500        }
2501    
2502        /*
2503         * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2504         * will wait to receive re dispatched messages.
2505         * default value is 0 so there is no wait by default.
2506         */
2507        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2508            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2509        }
2510    
2511        public long getConsumerFailoverRedeliveryWaitPeriod() {
2512            return consumerFailoverRedeliveryWaitPeriod;
2513        }
2514    
2515        protected Scheduler getScheduler() throws JMSException {
2516            Scheduler result = scheduler;
2517            if (result == null) {
2518                synchronized (this) {
2519                    result = scheduler;
2520                    if (result == null) {
2521                        checkClosed();
2522                        try {
2523                            result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2524                            scheduler.start();
2525                        } catch(Exception e) {
2526                            throw JMSExceptionSupport.create(e);
2527                        }
2528                    }
2529                }
2530            }
2531            return result;
2532        }
2533    
2534        protected ThreadPoolExecutor getExecutor() {
2535            return this.executor;
2536        }
2537    
2538        /**
2539         * @return the checkForDuplicates
2540         */
2541        public boolean isCheckForDuplicates() {
2542            return this.checkForDuplicates;
2543        }
2544    
2545        /**
2546         * @param checkForDuplicates the checkForDuplicates to set
2547         */
2548        public void setCheckForDuplicates(boolean checkForDuplicates) {
2549            this.checkForDuplicates = checkForDuplicates;
2550        }
2551    
2552        public boolean isTransactedIndividualAck() {
2553            return transactedIndividualAck;
2554        }
2555    
2556        public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2557            this.transactedIndividualAck = transactedIndividualAck;
2558        }
2559    
2560        public boolean isNonBlockingRedelivery() {
2561            return nonBlockingRedelivery;
2562        }
2563    
2564        public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2565            this.nonBlockingRedelivery = nonBlockingRedelivery;
2566        }
2567    
2568        /**
2569         * Removes any TempDestinations that this connection has cached, ignoring
2570         * any exceptions generated because the destination is in use as they should
2571         * not be removed.
2572         * Used from a pooled connection, b/c it will not be explicitly closed.
2573         */
2574        public void cleanUpTempDestinations() {
2575    
2576            if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2577                return;
2578            }
2579    
2580            Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2581                = this.activeTempDestinations.entrySet().iterator();
2582            while(entries.hasNext()) {
2583                ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2584                try {
2585                    // Only delete this temp destination if it was created from this connection. The connection used
2586                    // for the advisory consumer may also have a reference to this temp destination.
2587                    ActiveMQTempDestination dest = entry.getValue();
2588                    String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2589                    if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2590                        this.deleteTempDestination(entry.getValue());
2591                    }
2592                } catch (Exception ex) {
2593                    // the temp dest is in use so it can not be deleted.
2594                    // it is ok to leave it to connection tear down phase
2595                }
2596            }
2597        }
2598    
2599        /**
2600         * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2601         * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2602         */
2603        public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2604            this.redeliveryPolicyMap = redeliveryPolicyMap;
2605        }
2606    
2607        /**
2608         * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2609         * Consumers when dealing with transaction messages that have been rolled back.
2610         *
2611         * @return the redeliveryPolicyMap
2612         */
2613        public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2614            return redeliveryPolicyMap;
2615        }
2616    
2617        public int getMaxThreadPoolSize() {
2618            return maxThreadPoolSize;
2619        }
2620    
2621        public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2622            this.maxThreadPoolSize = maxThreadPoolSize;
2623        }
2624    
2625        /**
2626         * Enable enforcement of QueueConnection semantics.
2627         *
2628         * @return this object, useful for chaining
2629         */
2630        ActiveMQConnection enforceQueueOnlyConnection() {
2631            this.queueOnlyConnection = true;
2632            return this;
2633        }
2634    
2635        public RejectedExecutionHandler getRejectedTaskHandler() {
2636            return rejectedTaskHandler;
2637        }
2638    
2639        public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2640            this.rejectedTaskHandler = rejectedTaskHandler;
2641        }
2642    
2643        /**
2644         * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2645         * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
2646         * will not do any background Message acknowledgment.
2647         *
2648         * @return the scheduledOptimizedAckInterval
2649         */
2650        public long getOptimizedAckScheduledAckInterval() {
2651            return optimizedAckScheduledAckInterval;
2652        }
2653    
2654        /**
2655         * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2656         * have been configured with optimizeAcknowledge enabled.
2657         *
2658         * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
2659         */
2660        public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2661            this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2662        }
2663    }