001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.RejectedExecutionHandler;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036
037import javax.jms.Connection;
038import javax.jms.ConnectionConsumer;
039import javax.jms.ConnectionMetaData;
040import javax.jms.Destination;
041import javax.jms.ExceptionListener;
042import javax.jms.IllegalStateException;
043import javax.jms.InvalidDestinationException;
044import javax.jms.JMSException;
045import javax.jms.Queue;
046import javax.jms.QueueConnection;
047import javax.jms.QueueSession;
048import javax.jms.ServerSessionPool;
049import javax.jms.Session;
050import javax.jms.Topic;
051import javax.jms.TopicConnection;
052import javax.jms.TopicSession;
053import javax.jms.XAConnection;
054
055import org.apache.activemq.advisory.DestinationSource;
056import org.apache.activemq.blob.BlobTransferPolicy;
057import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
058import org.apache.activemq.command.ActiveMQDestination;
059import org.apache.activemq.command.ActiveMQMessage;
060import org.apache.activemq.command.ActiveMQTempDestination;
061import org.apache.activemq.command.ActiveMQTempQueue;
062import org.apache.activemq.command.ActiveMQTempTopic;
063import org.apache.activemq.command.BrokerInfo;
064import org.apache.activemq.command.Command;
065import org.apache.activemq.command.CommandTypes;
066import org.apache.activemq.command.ConnectionControl;
067import org.apache.activemq.command.ConnectionError;
068import org.apache.activemq.command.ConnectionId;
069import org.apache.activemq.command.ConnectionInfo;
070import org.apache.activemq.command.ConsumerControl;
071import org.apache.activemq.command.ConsumerId;
072import org.apache.activemq.command.ConsumerInfo;
073import org.apache.activemq.command.ControlCommand;
074import org.apache.activemq.command.DestinationInfo;
075import org.apache.activemq.command.ExceptionResponse;
076import org.apache.activemq.command.Message;
077import org.apache.activemq.command.MessageDispatch;
078import org.apache.activemq.command.MessageId;
079import org.apache.activemq.command.ProducerAck;
080import org.apache.activemq.command.ProducerId;
081import org.apache.activemq.command.RemoveInfo;
082import org.apache.activemq.command.RemoveSubscriptionInfo;
083import org.apache.activemq.command.Response;
084import org.apache.activemq.command.SessionId;
085import org.apache.activemq.command.ShutdownInfo;
086import org.apache.activemq.command.WireFormatInfo;
087import org.apache.activemq.management.JMSConnectionStatsImpl;
088import org.apache.activemq.management.JMSStatsImpl;
089import org.apache.activemq.management.StatsCapable;
090import org.apache.activemq.management.StatsImpl;
091import org.apache.activemq.state.CommandVisitorAdapter;
092import org.apache.activemq.thread.Scheduler;
093import org.apache.activemq.thread.TaskRunnerFactory;
094import org.apache.activemq.transport.FutureResponse;
095import org.apache.activemq.transport.RequestTimedOutIOException;
096import org.apache.activemq.transport.ResponseCallback;
097import org.apache.activemq.transport.Transport;
098import org.apache.activemq.transport.TransportListener;
099import org.apache.activemq.transport.failover.FailoverTransport;
100import org.apache.activemq.util.IdGenerator;
101import org.apache.activemq.util.IntrospectionSupport;
102import org.apache.activemq.util.JMSExceptionSupport;
103import org.apache.activemq.util.LongSequenceGenerator;
104import org.apache.activemq.util.ServiceSupport;
105import org.apache.activemq.util.ThreadPoolUtils;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
110
111    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
112    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
113    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
114    public static int DEFAULT_THREAD_POOL_SIZE = 1000;
115
116    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
117
118    public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
119
120    protected boolean dispatchAsync=true;
121    protected boolean alwaysSessionAsync = true;
122
123    private TaskRunnerFactory sessionTaskRunner;
124    private final ThreadPoolExecutor executor;
125
126    // Connection state variables
127    private final ConnectionInfo info;
128    private ExceptionListener exceptionListener;
129    private ClientInternalExceptionListener clientInternalExceptionListener;
130    private boolean clientIDSet;
131    private boolean isConnectionInfoSentToBroker;
132    private boolean userSpecifiedClientID;
133
134    // Configuration options variables
135    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
136    private BlobTransferPolicy blobTransferPolicy;
137    private RedeliveryPolicyMap redeliveryPolicyMap;
138    private MessageTransformer transformer;
139
140    private boolean disableTimeStampsByDefault;
141    private boolean optimizedMessageDispatch = true;
142    private boolean copyMessageOnSend = true;
143    private boolean useCompression;
144    private boolean objectMessageSerializationDefered;
145    private boolean useAsyncSend;
146    private boolean optimizeAcknowledge;
147    private long optimizeAcknowledgeTimeOut = 0;
148    private long optimizedAckScheduledAckInterval = 0;
149    private boolean nestedMapAndListEnabled = true;
150    private boolean useRetroactiveConsumer;
151    private boolean exclusiveConsumer;
152    private boolean alwaysSyncSend;
153    private int closeTimeout = 15000;
154    private boolean watchTopicAdvisories = true;
155    private long warnAboutUnstartedConnectionTimeout = 500L;
156    private int sendTimeout =0;
157    private boolean sendAcksAsync=true;
158    private boolean checkForDuplicates = true;
159    private boolean queueOnlyConnection = false;
160    private boolean consumerExpiryCheckEnabled = true;
161
162    private final Transport transport;
163    private final IdGenerator clientIdGenerator;
164    private final JMSStatsImpl factoryStats;
165    private final JMSConnectionStatsImpl stats;
166
167    private final AtomicBoolean started = new AtomicBoolean(false);
168    private final AtomicBoolean closing = new AtomicBoolean(false);
169    private final AtomicBoolean closed = new AtomicBoolean(false);
170    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
171    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
172    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
173    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
174
175    // Maps ConsumerIds to ActiveMQConsumer objects
176    private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
177    private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
178    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
179    private final SessionId connectionSessionId;
180    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
181    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
182    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
183
184    private AdvisoryConsumer advisoryConsumer;
185    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
186    private BrokerInfo brokerInfo;
187    private IOException firstFailureError;
188    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
189
190    // Assume that protocol is the latest. Change to the actual protocol
191    // version when a WireFormatInfo is received.
192    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
193    private final long timeCreated;
194    private final ConnectionAudit connectionAudit = new ConnectionAudit();
195    private DestinationSource destinationSource;
196    private final Object ensureConnectionInfoSentMutex = new Object();
197    private boolean useDedicatedTaskRunner;
198    protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
199    private long consumerFailoverRedeliveryWaitPeriod;
200    private Scheduler scheduler;
201    private boolean messagePrioritySupported = false;
202    private boolean transactedIndividualAck = false;
203    private boolean nonBlockingRedelivery = false;
204    private boolean rmIdFromConnectionId = false;
205
206    private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
207    private RejectedExecutionHandler rejectedTaskHandler = null;
208
209    /**
210     * Construct an <code>ActiveMQConnection</code>
211     *
212     * @param transport
213     * @param factoryStats
214     * @throws Exception
215     */
216    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
217
218        this.transport = transport;
219        this.clientIdGenerator = clientIdGenerator;
220        this.factoryStats = factoryStats;
221
222        // Configure a single threaded executor who's core thread can timeout if
223        // idle
224        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
225            @Override
226            public Thread newThread(Runnable r) {
227                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
228                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
229                //thread.setDaemon(true);
230                return thread;
231            }
232        });
233        // asyncConnectionThread.allowCoreThreadTimeOut(true);
234        String uniqueId = connectionIdGenerator.generateId();
235        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
236        this.info.setManageable(true);
237        this.info.setFaultTolerant(transport.isFaultTolerant());
238        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
239
240        this.transport.setTransportListener(this);
241
242        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
243        this.factoryStats.addConnection(this);
244        this.timeCreated = System.currentTimeMillis();
245        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
246    }
247
248    protected void setUserName(String userName) {
249        this.info.setUserName(userName);
250    }
251
252    protected void setPassword(String password) {
253        this.info.setPassword(password);
254    }
255
256    /**
257     * A static helper method to create a new connection
258     *
259     * @return an ActiveMQConnection
260     * @throws JMSException
261     */
262    public static ActiveMQConnection makeConnection() throws JMSException {
263        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
264        return (ActiveMQConnection)factory.createConnection();
265    }
266
267    /**
268     * A static helper method to create a new connection
269     *
270     * @param uri
271     * @return and ActiveMQConnection
272     * @throws JMSException
273     */
274    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
275        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
276        return (ActiveMQConnection)factory.createConnection();
277    }
278
279    /**
280     * A static helper method to create a new connection
281     *
282     * @param user
283     * @param password
284     * @param uri
285     * @return an ActiveMQConnection
286     * @throws JMSException
287     */
288    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
289        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
290        return (ActiveMQConnection)factory.createConnection();
291    }
292
293    /**
294     * @return a number unique for this connection
295     */
296    public JMSConnectionStatsImpl getConnectionStats() {
297        return stats;
298    }
299
300    /**
301     * Creates a <CODE>Session</CODE> object.
302     *
303     * @param transacted indicates whether the session is transacted
304     * @param acknowledgeMode indicates whether the consumer or the client will
305     *                acknowledge any messages it receives; ignored if the
306     *                session is transacted. Legal values are
307     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
308     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
309     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
310     * @return a newly created session
311     * @throws JMSException if the <CODE>Connection</CODE> object fails to
312     *                 create a session due to some internal error or lack of
313     *                 support for the specific transaction and acknowledgement
314     *                 mode.
315     * @see Session#AUTO_ACKNOWLEDGE
316     * @see Session#CLIENT_ACKNOWLEDGE
317     * @see Session#DUPS_OK_ACKNOWLEDGE
318     * @since 1.1
319     */
320    @Override
321    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
322        checkClosedOrFailed();
323        ensureConnectionInfoSent();
324        if(!transacted) {
325            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
326                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
327            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
328                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
329                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
330            }
331        }
332        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
333            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
334    }
335
336    /**
337     * @return sessionId
338     */
339    protected SessionId getNextSessionId() {
340        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
341    }
342
343    /**
344     * Gets the client identifier for this connection.
345     * <P>
346     * This value is specific to the JMS provider. It is either preconfigured by
347     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
348     * dynamically by the application by calling the <code>setClientID</code>
349     * method.
350     *
351     * @return the unique client identifier
352     * @throws JMSException if the JMS provider fails to return the client ID
353     *                 for this connection due to some internal error.
354     */
355    @Override
356    public String getClientID() throws JMSException {
357        checkClosedOrFailed();
358        return this.info.getClientId();
359    }
360
361    /**
362     * Sets the client identifier for this connection.
363     * <P>
364     * The preferred way to assign a JMS client's client identifier is for it to
365     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
366     * object and transparently assigned to the <CODE>Connection</CODE> object
367     * it creates.
368     * <P>
369     * Alternatively, a client can set a connection's client identifier using a
370     * provider-specific value. The facility to set a connection's client
371     * identifier explicitly is not a mechanism for overriding the identifier
372     * that has been administratively configured. It is provided for the case
373     * where no administratively specified identifier exists. If one does exist,
374     * an attempt to change it by setting it must throw an
375     * <CODE>IllegalStateException</CODE>. If a client sets the client
376     * identifier explicitly, it must do so immediately after it creates the
377     * connection and before any other action on the connection is taken. After
378     * this point, setting the client identifier is a programming error that
379     * should throw an <CODE>IllegalStateException</CODE>.
380     * <P>
381     * The purpose of the client identifier is to associate a connection and its
382     * objects with a state maintained on behalf of the client by a provider.
383     * The only such state identified by the JMS API is that required to support
384     * durable subscriptions.
385     * <P>
386     * If another connection with the same <code>clientID</code> is already
387     * running when this method is called, the JMS provider should detect the
388     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
389     *
390     * @param newClientID the unique client identifier
391     * @throws JMSException if the JMS provider fails to set the client ID for
392     *                 this connection due to some internal error.
393     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
394     *                 invalid or duplicate client ID.
395     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
396     *                 a connection's client ID at the wrong time or when it has
397     *                 been administratively configured.
398     */
399    @Override
400    public void setClientID(String newClientID) throws JMSException {
401        checkClosedOrFailed();
402
403        if (this.clientIDSet) {
404            throw new IllegalStateException("The clientID has already been set");
405        }
406
407        if (this.isConnectionInfoSentToBroker) {
408            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
409        }
410
411        this.info.setClientId(newClientID);
412        this.userSpecifiedClientID = true;
413        ensureConnectionInfoSent();
414    }
415
416    /**
417     * Sets the default client id that the connection will use if explicitly not
418     * set with the setClientId() call.
419     */
420    public void setDefaultClientID(String clientID) throws JMSException {
421        this.info.setClientId(clientID);
422        this.userSpecifiedClientID = true;
423    }
424
425    /**
426     * Gets the metadata for this connection.
427     *
428     * @return the connection metadata
429     * @throws JMSException if the JMS provider fails to get the connection
430     *                 metadata for this connection.
431     * @see javax.jms.ConnectionMetaData
432     */
433    @Override
434    public ConnectionMetaData getMetaData() throws JMSException {
435        checkClosedOrFailed();
436        return ActiveMQConnectionMetaData.INSTANCE;
437    }
438
439    /**
440     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
441     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
442     * associated with it.
443     *
444     * @return the <CODE>ExceptionListener</CODE> for this connection, or
445     *         null, if no <CODE>ExceptionListener</CODE> is associated with
446     *         this connection.
447     * @throws JMSException if the JMS provider fails to get the
448     *                 <CODE>ExceptionListener</CODE> for this connection.
449     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
450     */
451    @Override
452    public ExceptionListener getExceptionListener() throws JMSException {
453        checkClosedOrFailed();
454        return this.exceptionListener;
455    }
456
457    /**
458     * Sets an exception listener for this connection.
459     * <P>
460     * If a JMS provider detects a serious problem with a connection, it informs
461     * the connection's <CODE> ExceptionListener</CODE>, if one has been
462     * registered. It does this by calling the listener's <CODE>onException
463     * </CODE>
464     * method, passing it a <CODE>JMSException</CODE> object describing the
465     * problem.
466     * <P>
467     * An exception listener allows a client to be notified of a problem
468     * asynchronously. Some connections only consume messages, so they would
469     * have no other way to learn their connection has failed.
470     * <P>
471     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
472     * <P>
473     * A JMS provider should attempt to resolve connection problems itself
474     * before it notifies the client of them.
475     *
476     * @param listener the exception listener
477     * @throws JMSException if the JMS provider fails to set the exception
478     *                 listener for this connection.
479     */
480    @Override
481    public void setExceptionListener(ExceptionListener listener) throws JMSException {
482        checkClosedOrFailed();
483        this.exceptionListener = listener;
484    }
485
486    /**
487     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
488     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
489     * associated with it.
490     *
491     * @return the listener or <code>null</code> if no listener is registered with the connection.
492     */
493    public ClientInternalExceptionListener getClientInternalExceptionListener() {
494        return clientInternalExceptionListener;
495    }
496
497    /**
498     * Sets a client internal exception listener for this connection.
499     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
500     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
501     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
502     * describing the problem.
503     *
504     * @param listener the exception listener
505     */
506    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
507        this.clientInternalExceptionListener = listener;
508    }
509
510    /**
511     * Starts (or restarts) a connection's delivery of incoming messages. A call
512     * to <CODE>start</CODE> on a connection that has already been started is
513     * ignored.
514     *
515     * @throws JMSException if the JMS provider fails to start message delivery
516     *                 due to some internal error.
517     * @see javax.jms.Connection#stop()
518     */
519    @Override
520    public void start() throws JMSException {
521        checkClosedOrFailed();
522        ensureConnectionInfoSent();
523        if (started.compareAndSet(false, true)) {
524            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
525                ActiveMQSession session = i.next();
526                session.start();
527            }
528        }
529    }
530
531    /**
532     * Temporarily stops a connection's delivery of incoming messages. Delivery
533     * can be restarted using the connection's <CODE>start</CODE> method. When
534     * the connection is stopped, delivery to all the connection's message
535     * consumers is inhibited: synchronous receives block, and messages are not
536     * delivered to message listeners.
537     * <P>
538     * This call blocks until receives and/or message listeners in progress have
539     * completed.
540     * <P>
541     * Stopping a connection has no effect on its ability to send messages. A
542     * call to <CODE>stop</CODE> on a connection that has already been stopped
543     * is ignored.
544     * <P>
545     * A call to <CODE>stop</CODE> must not return until delivery of messages
546     * has paused. This means that a client can rely on the fact that none of
547     * its message listeners will be called and that all threads of control
548     * waiting for <CODE>receive</CODE> calls to return will not return with a
549     * message until the connection is restarted. The receive timers for a
550     * stopped connection continue to advance, so receives may time out while
551     * the connection is stopped.
552     * <P>
553     * If message listeners are running when <CODE>stop</CODE> is invoked, the
554     * <CODE>stop</CODE> call must wait until all of them have returned before
555     * it may return. While these message listeners are completing, they must
556     * have the full services of the connection available to them.
557     *
558     * @throws JMSException if the JMS provider fails to stop message delivery
559     *                 due to some internal error.
560     * @see javax.jms.Connection#start()
561     */
562    @Override
563    public void stop() throws JMSException {
564        doStop(true);
565    }
566
567    /**
568     * @see #stop()
569     * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed,
570     *                    <tt>false</tt> to skip this check
571     * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
572     */
573    void doStop(boolean checkClosed) throws JMSException {
574        if (checkClosed) {
575            checkClosedOrFailed();
576        }
577        if (started.compareAndSet(true, false)) {
578            synchronized(sessions) {
579                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
580                    ActiveMQSession s = i.next();
581                    s.stop();
582                }
583            }
584        }
585    }
586
587    /**
588     * Closes the connection.
589     * <P>
590     * Since a provider typically allocates significant resources outside the
591     * JVM on behalf of a connection, clients should close these resources when
592     * they are not needed. Relying on garbage collection to eventually reclaim
593     * these resources may not be timely enough.
594     * <P>
595     * There is no need to close the sessions, producers, and consumers of a
596     * closed connection.
597     * <P>
598     * Closing a connection causes all temporary destinations to be deleted.
599     * <P>
600     * When this method is invoked, it should not return until message
601     * processing has been shut down in an orderly fashion. This means that all
602     * message listeners that may have been running have returned, and that all
603     * pending receives have returned. A close terminates all pending message
604     * receives on the connection's sessions' consumers. The receives may return
605     * with a message or with null, depending on whether there was a message
606     * available at the time of the close. If one or more of the connection's
607     * sessions' message listeners is processing a message at the time when
608     * connection <CODE>close</CODE> is invoked, all the facilities of the
609     * connection and its sessions must remain available to those listeners
610     * until they return control to the JMS provider.
611     * <P>
612     * Closing a connection causes any of its sessions' transactions in progress
613     * to be rolled back. In the case where a session's work is coordinated by
614     * an external transaction manager, a session's <CODE>commit</CODE> and
615     * <CODE> rollback</CODE> methods are not used and the result of a closed
616     * session's work is determined later by the transaction manager. Closing a
617     * connection does NOT force an acknowledgment of client-acknowledged
618     * sessions.
619     * <P>
620     * Invoking the <CODE>acknowledge</CODE> method of a received message from
621     * a closed connection's session must throw an
622     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
623     * NOT throw an exception.
624     *
625     * @throws JMSException if the JMS provider fails to close the connection
626     *                 due to some internal error. For example, a failure to
627     *                 release resources or to close a socket connection can
628     *                 cause this exception to be thrown.
629     */
630    @Override
631    public void close() throws JMSException {
632        try {
633            // If we were running, lets stop first.
634            if (!closed.get() && !transportFailed.get()) {
635                // do not fail if already closed as according to JMS spec we must not
636                // throw exception if already closed
637                doStop(false);
638            }
639
640            synchronized (this) {
641                if (!closed.get()) {
642                    closing.set(true);
643
644                    if (destinationSource != null) {
645                        destinationSource.stop();
646                        destinationSource = null;
647                    }
648                    if (advisoryConsumer != null) {
649                        advisoryConsumer.dispose();
650                        advisoryConsumer = null;
651                    }
652
653                    Scheduler scheduler = this.scheduler;
654                    if (scheduler != null) {
655                        try {
656                            scheduler.stop();
657                        } catch (Exception e) {
658                            JMSException ex =  JMSExceptionSupport.create(e);
659                            throw ex;
660                        }
661                    }
662
663                    long lastDeliveredSequenceId = -1;
664                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
665                        ActiveMQSession s = i.next();
666                        s.dispose();
667                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
668                    }
669                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
670                        ActiveMQConnectionConsumer c = i.next();
671                        c.dispose();
672                    }
673
674                    this.activeTempDestinations.clear();
675
676                    try {
677                        if (isConnectionInfoSentToBroker) {
678                            // If we announced ourselves to the broker.. Try to let the broker
679                            // know that the connection is being shutdown.
680                            RemoveInfo removeCommand = info.createRemoveCommand();
681                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
682                            try {
683                                doSyncSendPacket(removeCommand, closeTimeout);
684                            } catch (JMSException e) {
685                                if (e.getCause() instanceof RequestTimedOutIOException) {
686                                    // expected
687                                } else {
688                                    throw e;
689                                }
690                            }
691                            doAsyncSendPacket(new ShutdownInfo());
692                        }
693                    } finally { // release anyway even if previous communication fails
694                        started.set(false);
695
696                        // TODO if we move the TaskRunnerFactory to the connection
697                        // factory
698                        // then we may need to call
699                        // factory.onConnectionClose(this);
700                        if (sessionTaskRunner != null) {
701                            sessionTaskRunner.shutdown();
702                        }
703                        closed.set(true);
704                        closing.set(false);
705                    }
706                }
707            }
708        } finally {
709            try {
710                if (executor != null) {
711                    ThreadPoolUtils.shutdown(executor);
712                }
713            } catch (Throwable e) {
714                LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
715            }
716
717            ServiceSupport.dispose(this.transport);
718
719            factoryStats.removeConnection(this);
720        }
721    }
722
723    /**
724     * Tells the broker to terminate its VM. This can be used to cleanly
725     * terminate a broker running in a standalone java process. Server must have
726     * property enable.vm.shutdown=true defined to allow this to work.
727     */
728    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
729    // implemented.
730    /*
731     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
732     * command = new BrokerAdminCommand();
733     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
734     * asyncSendPacket(command); }
735     */
736
737    /**
738     * Create a durable connection consumer for this connection (optional
739     * operation). This is an expert facility not used by regular JMS clients.
740     *
741     * @param topic topic to access
742     * @param subscriptionName durable subscription name
743     * @param messageSelector only messages with properties matching the message
744     *                selector expression are delivered. A value of null or an
745     *                empty string indicates that there is no message selector
746     *                for the message consumer.
747     * @param sessionPool the server session pool to associate with this durable
748     *                connection consumer
749     * @param maxMessages the maximum number of messages that can be assigned to
750     *                a server session at one time
751     * @return the durable connection consumer
752     * @throws JMSException if the <CODE>Connection</CODE> object fails to
753     *                 create a connection consumer due to some internal error
754     *                 or invalid arguments for <CODE>sessionPool</CODE> and
755     *                 <CODE>messageSelector</CODE>.
756     * @throws javax.jms.InvalidDestinationException if an invalid destination
757     *                 is specified.
758     * @throws javax.jms.InvalidSelectorException if the message selector is
759     *                 invalid.
760     * @see javax.jms.ConnectionConsumer
761     * @since 1.1
762     */
763    @Override
764    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
765        throws JMSException {
766        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
767    }
768
769    /**
770     * Create a durable connection consumer for this connection (optional
771     * operation). This is an expert facility not used by regular JMS clients.
772     *
773     * @param topic topic to access
774     * @param subscriptionName durable subscription name
775     * @param messageSelector only messages with properties matching the message
776     *                selector expression are delivered. A value of null or an
777     *                empty string indicates that there is no message selector
778     *                for the message consumer.
779     * @param sessionPool the server session pool to associate with this durable
780     *                connection consumer
781     * @param maxMessages the maximum number of messages that can be assigned to
782     *                a server session at one time
783     * @param noLocal set true if you want to filter out messages published
784     *                locally
785     * @return the durable connection consumer
786     * @throws JMSException if the <CODE>Connection</CODE> object fails to
787     *                 create a connection consumer due to some internal error
788     *                 or invalid arguments for <CODE>sessionPool</CODE> and
789     *                 <CODE>messageSelector</CODE>.
790     * @throws javax.jms.InvalidDestinationException if an invalid destination
791     *                 is specified.
792     * @throws javax.jms.InvalidSelectorException if the message selector is
793     *                 invalid.
794     * @see javax.jms.ConnectionConsumer
795     * @since 1.1
796     */
797    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
798                                                              boolean noLocal) throws JMSException {
799        checkClosedOrFailed();
800
801        if (queueOnlyConnection) {
802            throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
803        }
804
805        ensureConnectionInfoSent();
806        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
807        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
808        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
809        info.setSubscriptionName(subscriptionName);
810        info.setSelector(messageSelector);
811        info.setPrefetchSize(maxMessages);
812        info.setDispatchAsync(isDispatchAsync());
813
814        // Allows the options on the destination to configure the consumerInfo
815        if (info.getDestination().getOptions() != null) {
816            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
817            IntrospectionSupport.setProperties(this.info, options, "consumer.");
818        }
819
820        return new ActiveMQConnectionConsumer(this, sessionPool, info);
821    }
822
823    // Properties
824    // -------------------------------------------------------------------------
825
826    /**
827     * Returns true if this connection has been started
828     *
829     * @return true if this Connection is started
830     */
831    public boolean isStarted() {
832        return started.get();
833    }
834
835    /**
836     * Returns true if the connection is closed
837     */
838    public boolean isClosed() {
839        return closed.get();
840    }
841
842    /**
843     * Returns true if the connection is in the process of being closed
844     */
845    public boolean isClosing() {
846        return closing.get();
847    }
848
849    /**
850     * Returns true if the underlying transport has failed
851     */
852    public boolean isTransportFailed() {
853        return transportFailed.get();
854    }
855
856    /**
857     * @return Returns the prefetchPolicy.
858     */
859    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
860        return prefetchPolicy;
861    }
862
863    /**
864     * Sets the <a
865     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
866     * policy</a> for consumers created by this connection.
867     */
868    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
869        this.prefetchPolicy = prefetchPolicy;
870    }
871
872    /**
873     */
874    public Transport getTransportChannel() {
875        return transport;
876    }
877
878    /**
879     * @return Returns the clientID of the connection, forcing one to be
880     *         generated if one has not yet been configured.
881     */
882    public String getInitializedClientID() throws JMSException {
883        ensureConnectionInfoSent();
884        return info.getClientId();
885    }
886
887    /**
888     * @return Returns the timeStampsDisableByDefault.
889     */
890    public boolean isDisableTimeStampsByDefault() {
891        return disableTimeStampsByDefault;
892    }
893
894    /**
895     * Sets whether or not timestamps on messages should be disabled or not. If
896     * you disable them it adds a small performance boost.
897     */
898    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
899        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
900    }
901
902    /**
903     * @return Returns the dispatchOptimizedMessage.
904     */
905    public boolean isOptimizedMessageDispatch() {
906        return optimizedMessageDispatch;
907    }
908
909    /**
910     * If this flag is set then an larger prefetch limit is used - only
911     * applicable for durable topic subscribers.
912     */
913    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
914        this.optimizedMessageDispatch = dispatchOptimizedMessage;
915    }
916
917    /**
918     * @return Returns the closeTimeout.
919     */
920    public int getCloseTimeout() {
921        return closeTimeout;
922    }
923
924    /**
925     * Sets the timeout before a close is considered complete. Normally a
926     * close() on a connection waits for confirmation from the broker; this
927     * allows that operation to timeout to save the client hanging if there is
928     * no broker
929     */
930    public void setCloseTimeout(int closeTimeout) {
931        this.closeTimeout = closeTimeout;
932    }
933
934    /**
935     * @return ConnectionInfo
936     */
937    public ConnectionInfo getConnectionInfo() {
938        return this.info;
939    }
940
941    public boolean isUseRetroactiveConsumer() {
942        return useRetroactiveConsumer;
943    }
944
945    /**
946     * Sets whether or not retroactive consumers are enabled. Retroactive
947     * consumers allow non-durable topic subscribers to receive old messages
948     * that were published before the non-durable subscriber started.
949     */
950    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
951        this.useRetroactiveConsumer = useRetroactiveConsumer;
952    }
953
954    public boolean isNestedMapAndListEnabled() {
955        return nestedMapAndListEnabled;
956    }
957
958    /**
959     * Enables/disables whether or not Message properties and MapMessage entries
960     * support <a
961     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
962     * Structures</a> of Map and List objects
963     */
964    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
965        this.nestedMapAndListEnabled = structuredMapsEnabled;
966    }
967
968    public boolean isExclusiveConsumer() {
969        return exclusiveConsumer;
970    }
971
972    /**
973     * Enables or disables whether or not queue consumers should be exclusive or
974     * not for example to preserve ordering when not using <a
975     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
976     *
977     * @param exclusiveConsumer
978     */
979    public void setExclusiveConsumer(boolean exclusiveConsumer) {
980        this.exclusiveConsumer = exclusiveConsumer;
981    }
982
983    /**
984     * Adds a transport listener so that a client can be notified of events in
985     * the underlying transport
986     */
987    public void addTransportListener(TransportListener transportListener) {
988        transportListeners.add(transportListener);
989    }
990
991    public void removeTransportListener(TransportListener transportListener) {
992        transportListeners.remove(transportListener);
993    }
994
995    public boolean isUseDedicatedTaskRunner() {
996        return useDedicatedTaskRunner;
997    }
998
999    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1000        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1001    }
1002
1003    public TaskRunnerFactory getSessionTaskRunner() {
1004        synchronized (this) {
1005            if (sessionTaskRunner == null) {
1006                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
1007                sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
1008            }
1009        }
1010        return sessionTaskRunner;
1011    }
1012
1013    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1014        this.sessionTaskRunner = sessionTaskRunner;
1015    }
1016
1017    public MessageTransformer getTransformer() {
1018        return transformer;
1019    }
1020
1021    /**
1022     * Sets the transformer used to transform messages before they are sent on
1023     * to the JMS bus or when they are received from the bus but before they are
1024     * delivered to the JMS client
1025     */
1026    public void setTransformer(MessageTransformer transformer) {
1027        this.transformer = transformer;
1028    }
1029
1030    /**
1031     * @return the statsEnabled
1032     */
1033    public boolean isStatsEnabled() {
1034        return this.stats.isEnabled();
1035    }
1036
1037    /**
1038     * @param statsEnabled the statsEnabled to set
1039     */
1040    public void setStatsEnabled(boolean statsEnabled) {
1041        this.stats.setEnabled(statsEnabled);
1042    }
1043
1044    /**
1045     * Returns the {@link DestinationSource} object which can be used to listen to destinations
1046     * being created or destroyed or to enquire about the current destinations available on the broker
1047     *
1048     * @return a lazily created destination source
1049     * @throws JMSException
1050     */
1051    @Override
1052    public DestinationSource getDestinationSource() throws JMSException {
1053        if (destinationSource == null) {
1054            destinationSource = new DestinationSource(this);
1055            destinationSource.start();
1056        }
1057        return destinationSource;
1058    }
1059
1060    // Implementation methods
1061    // -------------------------------------------------------------------------
1062
1063    /**
1064     * Used internally for adding Sessions to the Connection
1065     *
1066     * @param session
1067     * @throws JMSException
1068     * @throws JMSException
1069     */
1070    protected void addSession(ActiveMQSession session) throws JMSException {
1071        this.sessions.add(session);
1072        if (sessions.size() > 1 || session.isTransacted()) {
1073            optimizedMessageDispatch = false;
1074        }
1075    }
1076
1077    /**
1078     * Used interanlly for removing Sessions from a Connection
1079     *
1080     * @param session
1081     */
1082    protected void removeSession(ActiveMQSession session) {
1083        this.sessions.remove(session);
1084        this.removeDispatcher(session);
1085    }
1086
1087    /**
1088     * Add a ConnectionConsumer
1089     *
1090     * @param connectionConsumer
1091     * @throws JMSException
1092     */
1093    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1094        this.connectionConsumers.add(connectionConsumer);
1095    }
1096
1097    /**
1098     * Remove a ConnectionConsumer
1099     *
1100     * @param connectionConsumer
1101     */
1102    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1103        this.connectionConsumers.remove(connectionConsumer);
1104        this.removeDispatcher(connectionConsumer);
1105    }
1106
1107    /**
1108     * Creates a <CODE>TopicSession</CODE> object.
1109     *
1110     * @param transacted indicates whether the session is transacted
1111     * @param acknowledgeMode indicates whether the consumer or the client will
1112     *                acknowledge any messages it receives; ignored if the
1113     *                session is transacted. Legal values are
1114     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1115     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1116     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1117     * @return a newly created topic session
1118     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1119     *                 to create a session due to some internal error or lack of
1120     *                 support for the specific transaction and acknowledgement
1121     *                 mode.
1122     * @see Session#AUTO_ACKNOWLEDGE
1123     * @see Session#CLIENT_ACKNOWLEDGE
1124     * @see Session#DUPS_OK_ACKNOWLEDGE
1125     */
1126    @Override
1127    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1128        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1129    }
1130
1131    /**
1132     * Creates a connection consumer for this connection (optional operation).
1133     * This is an expert facility not used by regular JMS clients.
1134     *
1135     * @param topic the topic to access
1136     * @param messageSelector only messages with properties matching the message
1137     *                selector expression are delivered. A value of null or an
1138     *                empty string indicates that there is no message selector
1139     *                for the message consumer.
1140     * @param sessionPool the server session pool to associate with this
1141     *                connection consumer
1142     * @param maxMessages the maximum number of messages that can be assigned to
1143     *                a server session at one time
1144     * @return the connection consumer
1145     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1146     *                 to create a connection consumer due to some internal
1147     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1148     *                 and <CODE>messageSelector</CODE>.
1149     * @throws javax.jms.InvalidDestinationException if an invalid topic is
1150     *                 specified.
1151     * @throws javax.jms.InvalidSelectorException if the message selector is
1152     *                 invalid.
1153     * @see javax.jms.ConnectionConsumer
1154     */
1155    @Override
1156    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1157        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1158    }
1159
1160    /**
1161     * Creates a connection consumer for this connection (optional operation).
1162     * This is an expert facility not used by regular JMS clients.
1163     *
1164     * @param queue the queue to access
1165     * @param messageSelector only messages with properties matching the message
1166     *                selector expression are delivered. A value of null or an
1167     *                empty string indicates that there is no message selector
1168     *                for the message consumer.
1169     * @param sessionPool the server session pool to associate with this
1170     *                connection consumer
1171     * @param maxMessages the maximum number of messages that can be assigned to
1172     *                a server session at one time
1173     * @return the connection consumer
1174     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1175     *                 to create a connection consumer due to some internal
1176     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1177     *                 and <CODE>messageSelector</CODE>.
1178     * @throws javax.jms.InvalidDestinationException if an invalid queue is
1179     *                 specified.
1180     * @throws javax.jms.InvalidSelectorException if the message selector is
1181     *                 invalid.
1182     * @see javax.jms.ConnectionConsumer
1183     */
1184    @Override
1185    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1186        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1187    }
1188
1189    /**
1190     * Creates a connection consumer for this connection (optional operation).
1191     * This is an expert facility not used by regular JMS clients.
1192     *
1193     * @param destination the destination to access
1194     * @param messageSelector only messages with properties matching the message
1195     *                selector expression are delivered. A value of null or an
1196     *                empty string indicates that there is no message selector
1197     *                for the message consumer.
1198     * @param sessionPool the server session pool to associate with this
1199     *                connection consumer
1200     * @param maxMessages the maximum number of messages that can be assigned to
1201     *                a server session at one time
1202     * @return the connection consumer
1203     * @throws JMSException if the <CODE>Connection</CODE> object fails to
1204     *                 create a connection consumer due to some internal error
1205     *                 or invalid arguments for <CODE>sessionPool</CODE> and
1206     *                 <CODE>messageSelector</CODE>.
1207     * @throws javax.jms.InvalidDestinationException if an invalid destination
1208     *                 is specified.
1209     * @throws javax.jms.InvalidSelectorException if the message selector is
1210     *                 invalid.
1211     * @see javax.jms.ConnectionConsumer
1212     * @since 1.1
1213     */
1214    @Override
1215    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1216        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1217    }
1218
1219    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1220        throws JMSException {
1221
1222        checkClosedOrFailed();
1223        ensureConnectionInfoSent();
1224
1225        ConsumerId consumerId = createConsumerId();
1226        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1227        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1228        consumerInfo.setSelector(messageSelector);
1229        consumerInfo.setPrefetchSize(maxMessages);
1230        consumerInfo.setNoLocal(noLocal);
1231        consumerInfo.setDispatchAsync(isDispatchAsync());
1232
1233        // Allows the options on the destination to configure the consumerInfo
1234        if (consumerInfo.getDestination().getOptions() != null) {
1235            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1236            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1237        }
1238
1239        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1240    }
1241
1242    /**
1243     * @return a newly created ConsumedId unique to this connection session instance.
1244     */
1245    private ConsumerId createConsumerId() {
1246        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1247    }
1248
1249    /**
1250     * Creates a <CODE>QueueSession</CODE> object.
1251     *
1252     * @param transacted indicates whether the session is transacted
1253     * @param acknowledgeMode indicates whether the consumer or the client will
1254     *                acknowledge any messages it receives; ignored if the
1255     *                session is transacted. Legal values are
1256     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1257     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1258     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1259     * @return a newly created queue session
1260     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1261     *                 to create a session due to some internal error or lack of
1262     *                 support for the specific transaction and acknowledgement
1263     *                 mode.
1264     * @see Session#AUTO_ACKNOWLEDGE
1265     * @see Session#CLIENT_ACKNOWLEDGE
1266     * @see Session#DUPS_OK_ACKNOWLEDGE
1267     */
1268    @Override
1269    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1270        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1271    }
1272
1273    /**
1274     * Ensures that the clientID was manually specified and not auto-generated.
1275     * If the clientID was not specified this method will throw an exception.
1276     * This method is used to ensure that the clientID + durableSubscriber name
1277     * are used correctly.
1278     *
1279     * @throws JMSException
1280     */
1281    public void checkClientIDWasManuallySpecified() throws JMSException {
1282        if (!userSpecifiedClientID) {
1283            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1284        }
1285    }
1286
1287    /**
1288     * send a Packet through the Connection - for internal use only
1289     *
1290     * @param command
1291     * @throws JMSException
1292     */
1293    public void asyncSendPacket(Command command) throws JMSException {
1294        if (isClosed()) {
1295            throw new ConnectionClosedException();
1296        } else {
1297            doAsyncSendPacket(command);
1298        }
1299    }
1300
1301    private void doAsyncSendPacket(Command command) throws JMSException {
1302        try {
1303            this.transport.oneway(command);
1304        } catch (IOException e) {
1305            throw JMSExceptionSupport.create(e);
1306        }
1307    }
1308
1309    /**
1310     * Send a packet through a Connection - for internal use only
1311     *
1312     * @param command
1313     *
1314     * @throws JMSException
1315     */
1316    public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1317        if(onComplete==null) {
1318            syncSendPacket(command);
1319        } else {
1320            if (isClosed()) {
1321                throw new ConnectionClosedException();
1322            }
1323            try {
1324                this.transport.asyncRequest(command, new ResponseCallback() {
1325                    @Override
1326                    public void onCompletion(FutureResponse resp) {
1327                        Response response;
1328                        Throwable exception = null;
1329                        try {
1330                            response = resp.getResult();
1331                            if (response.isException()) {
1332                                ExceptionResponse er = (ExceptionResponse)response;
1333                                exception = er.getException();
1334                            }
1335                        } catch (Exception e) {
1336                            exception = e;
1337                        }
1338                        if(exception!=null) {
1339                            if ( exception instanceof JMSException) {
1340                                onComplete.onException((JMSException) exception);
1341                            } else {
1342                                if (isClosed()||closing.get()) {
1343                                    LOG.debug("Received an exception but connection is closing");
1344                                }
1345                                JMSException jmsEx = null;
1346                                try {
1347                                    jmsEx = JMSExceptionSupport.create(exception);
1348                                } catch(Throwable e) {
1349                                    LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1350                                }
1351                                // dispose of transport for security exceptions on connection initiation
1352                                if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1353                                    forceCloseOnSecurityException(exception);
1354                                }
1355                                if (jmsEx !=null) {
1356                                    onComplete.onException(jmsEx);
1357                                }
1358                            }
1359                        } else {
1360                            onComplete.onSuccess();
1361                        }
1362                    }
1363                });
1364            } catch (IOException e) {
1365                throw JMSExceptionSupport.create(e);
1366            }
1367        }
1368    }
1369
1370    private void forceCloseOnSecurityException(Throwable exception) {
1371        LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception);
1372        onException(new IOException("Force close due to SecurityException on connect", exception));
1373    }
1374
1375    public Response syncSendPacket(Command command) throws JMSException {
1376        if (isClosed()) {
1377            throw new ConnectionClosedException();
1378        } else {
1379
1380            try {
1381                Response response = (Response)this.transport.request(command);
1382                if (response.isException()) {
1383                    ExceptionResponse er = (ExceptionResponse)response;
1384                    if (er.getException() instanceof JMSException) {
1385                        throw (JMSException)er.getException();
1386                    } else {
1387                        if (isClosed()||closing.get()) {
1388                            LOG.debug("Received an exception but connection is closing");
1389                        }
1390                        JMSException jmsEx = null;
1391                        try {
1392                            jmsEx = JMSExceptionSupport.create(er.getException());
1393                        } catch(Throwable e) {
1394                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1395                        }
1396                        if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1397                            forceCloseOnSecurityException(er.getException());
1398                        }
1399                        if (jmsEx !=null) {
1400                            throw jmsEx;
1401                        }
1402                    }
1403                }
1404                return response;
1405            } catch (IOException e) {
1406                throw JMSExceptionSupport.create(e);
1407            }
1408        }
1409    }
1410
1411    /**
1412     * Send a packet through a Connection - for internal use only
1413     *
1414     * @param command
1415     *
1416     * @return the broker Response for the given Command.
1417     *
1418     * @throws JMSException
1419     */
1420    public Response syncSendPacket(Command command, int timeout) throws JMSException {
1421        if (isClosed() || closing.get()) {
1422            throw new ConnectionClosedException();
1423        } else {
1424            return doSyncSendPacket(command, timeout);
1425        }
1426    }
1427
1428    private Response doSyncSendPacket(Command command, int timeout)
1429            throws JMSException {
1430        try {
1431            Response response = (Response) (timeout > 0
1432                    ? this.transport.request(command, timeout)
1433                    : this.transport.request(command));
1434            if (response != null && response.isException()) {
1435                ExceptionResponse er = (ExceptionResponse)response;
1436                if (er.getException() instanceof JMSException) {
1437                    throw (JMSException)er.getException();
1438                } else {
1439                    throw JMSExceptionSupport.create(er.getException());
1440                }
1441            }
1442            return response;
1443        } catch (IOException e) {
1444            throw JMSExceptionSupport.create(e);
1445        }
1446    }
1447
1448    /**
1449     * @return statistics for this Connection
1450     */
1451    @Override
1452    public StatsImpl getStats() {
1453        return stats;
1454    }
1455
1456    /**
1457     * simply throws an exception if the Connection is already closed or the
1458     * Transport has failed
1459     *
1460     * @throws JMSException
1461     */
1462    protected synchronized void checkClosedOrFailed() throws JMSException {
1463        checkClosed();
1464        if (transportFailed.get()) {
1465            throw new ConnectionFailedException(firstFailureError);
1466        }
1467    }
1468
1469    /**
1470     * simply throws an exception if the Connection is already closed
1471     *
1472     * @throws JMSException
1473     */
1474    protected synchronized void checkClosed() throws JMSException {
1475        if (closed.get()) {
1476            throw new ConnectionClosedException();
1477        }
1478    }
1479
1480    /**
1481     * Send the ConnectionInfo to the Broker
1482     *
1483     * @throws JMSException
1484     */
1485    protected void ensureConnectionInfoSent() throws JMSException {
1486        synchronized(this.ensureConnectionInfoSentMutex) {
1487            // Can we skip sending the ConnectionInfo packet??
1488            if (isConnectionInfoSentToBroker || closed.get()) {
1489                return;
1490            }
1491            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1492            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1493                info.setClientId(clientIdGenerator.generateId());
1494            }
1495            syncSendPacket(info.copy());
1496
1497            this.isConnectionInfoSentToBroker = true;
1498            // Add a temp destination advisory consumer so that
1499            // We know what the valid temporary destinations are on the
1500            // broker without having to do an RPC to the broker.
1501
1502            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1503            if (watchTopicAdvisories) {
1504                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1505            }
1506        }
1507    }
1508
1509    public synchronized boolean isWatchTopicAdvisories() {
1510        return watchTopicAdvisories;
1511    }
1512
1513    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1514        this.watchTopicAdvisories = watchTopicAdvisories;
1515    }
1516
1517    /**
1518     * @return Returns the useAsyncSend.
1519     */
1520    public boolean isUseAsyncSend() {
1521        return useAsyncSend;
1522    }
1523
1524    /**
1525     * Forces the use of <a
1526     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1527     * adds a massive performance boost; but means that the send() method will
1528     * return immediately whether the message has been sent or not which could
1529     * lead to message loss.
1530     */
1531    public void setUseAsyncSend(boolean useAsyncSend) {
1532        this.useAsyncSend = useAsyncSend;
1533    }
1534
1535    /**
1536     * @return true if always sync send messages
1537     */
1538    public boolean isAlwaysSyncSend() {
1539        return this.alwaysSyncSend;
1540    }
1541
1542    /**
1543     * Set true if always require messages to be sync sent
1544     *
1545     * @param alwaysSyncSend
1546     */
1547    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1548        this.alwaysSyncSend = alwaysSyncSend;
1549    }
1550
1551    /**
1552     * @return the messagePrioritySupported
1553     */
1554    public boolean isMessagePrioritySupported() {
1555        return this.messagePrioritySupported;
1556    }
1557
1558    /**
1559     * @param messagePrioritySupported the messagePrioritySupported to set
1560     */
1561    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1562        this.messagePrioritySupported = messagePrioritySupported;
1563    }
1564
1565    /**
1566     * Cleans up this connection so that it's state is as if the connection was
1567     * just created. This allows the Resource Adapter to clean up a connection
1568     * so that it can be reused without having to close and recreate the
1569     * connection.
1570     */
1571    public void cleanup() throws JMSException {
1572        doCleanup(false);
1573    }
1574
1575    public void doCleanup(boolean removeConnection) throws JMSException {
1576        if (advisoryConsumer != null && !isTransportFailed()) {
1577            advisoryConsumer.dispose();
1578            advisoryConsumer = null;
1579        }
1580
1581        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1582            ActiveMQSession s = i.next();
1583            s.dispose();
1584        }
1585        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1586            ActiveMQConnectionConsumer c = i.next();
1587            c.dispose();
1588        }
1589
1590        if (removeConnection) {
1591            if (isConnectionInfoSentToBroker) {
1592                if (!transportFailed.get() && !closing.get()) {
1593                    syncSendPacket(info.createRemoveCommand());
1594                }
1595                isConnectionInfoSentToBroker = false;
1596            }
1597            if (userSpecifiedClientID) {
1598                info.setClientId(null);
1599                userSpecifiedClientID = false;
1600            }
1601            clientIDSet = false;
1602        }
1603
1604        started.set(false);
1605    }
1606
1607    /**
1608     * Changes the associated username/password that is associated with this
1609     * connection. If the connection has been used, you must called cleanup()
1610     * before calling this method.
1611     *
1612     * @throws IllegalStateException if the connection is in used.
1613     */
1614    public void changeUserInfo(String userName, String password) throws JMSException {
1615        if (isConnectionInfoSentToBroker) {
1616            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1617        }
1618        this.info.setUserName(userName);
1619        this.info.setPassword(password);
1620    }
1621
1622    /**
1623     * @return Returns the resourceManagerId.
1624     * @throws JMSException
1625     */
1626    public String getResourceManagerId() throws JMSException {
1627        if (isRmIdFromConnectionId()) {
1628            return info.getConnectionId().getValue();
1629        }
1630        waitForBrokerInfo();
1631        if (brokerInfo == null) {
1632            throw new JMSException("Connection failed before Broker info was received.");
1633        }
1634        return brokerInfo.getBrokerId().getValue();
1635    }
1636
1637    /**
1638     * Returns the broker name if one is available or null if one is not
1639     * available yet.
1640     */
1641    public String getBrokerName() {
1642        try {
1643            brokerInfoReceived.await(5, TimeUnit.SECONDS);
1644            if (brokerInfo == null) {
1645                return null;
1646            }
1647            return brokerInfo.getBrokerName();
1648        } catch (InterruptedException e) {
1649            Thread.currentThread().interrupt();
1650            return null;
1651        }
1652    }
1653
1654    /**
1655     * Returns the broker information if it is available or null if it is not
1656     * available yet.
1657     */
1658    public BrokerInfo getBrokerInfo() {
1659        return brokerInfo;
1660    }
1661
1662    /**
1663     * @return Returns the RedeliveryPolicy.
1664     * @throws JMSException
1665     */
1666    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1667        return redeliveryPolicyMap.getDefaultEntry();
1668    }
1669
1670    /**
1671     * Sets the redelivery policy to be used when messages are rolled back
1672     */
1673    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1674        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1675    }
1676
1677    public BlobTransferPolicy getBlobTransferPolicy() {
1678        if (blobTransferPolicy == null) {
1679            blobTransferPolicy = createBlobTransferPolicy();
1680        }
1681        return blobTransferPolicy;
1682    }
1683
1684    /**
1685     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1686     * OBjects) are transferred from producers to brokers to consumers
1687     */
1688    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1689        this.blobTransferPolicy = blobTransferPolicy;
1690    }
1691
1692    /**
1693     * @return Returns the alwaysSessionAsync.
1694     */
1695    public boolean isAlwaysSessionAsync() {
1696        return alwaysSessionAsync;
1697    }
1698
1699    /**
1700     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1701     * the Connection. However, a separate thread is always used if there is more than one session, or the session
1702     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
1703     * happens asynchronously.
1704     */
1705    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1706        this.alwaysSessionAsync = alwaysSessionAsync;
1707    }
1708
1709    /**
1710     * @return Returns the optimizeAcknowledge.
1711     */
1712    public boolean isOptimizeAcknowledge() {
1713        return optimizeAcknowledge;
1714    }
1715
1716    /**
1717     * Enables an optimised acknowledgement mode where messages are acknowledged
1718     * in batches rather than individually
1719     *
1720     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1721     */
1722    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1723        this.optimizeAcknowledge = optimizeAcknowledge;
1724    }
1725
1726    /**
1727     * The max time in milliseconds between optimized ack batches
1728     * @param optimizeAcknowledgeTimeOut
1729     */
1730    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1731        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1732    }
1733
1734    public long getOptimizeAcknowledgeTimeOut() {
1735        return optimizeAcknowledgeTimeOut;
1736    }
1737
1738    public long getWarnAboutUnstartedConnectionTimeout() {
1739        return warnAboutUnstartedConnectionTimeout;
1740    }
1741
1742    /**
1743     * Enables the timeout from a connection creation to when a warning is
1744     * generated if the connection is not properly started via {@link #start()}
1745     * and a message is received by a consumer. It is a very common gotcha to
1746     * forget to <a
1747     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1748     * the connection</a> so this option makes the default case to create a
1749     * warning if the user forgets. To disable the warning just set the value to <
1750     * 0 (say -1).
1751     */
1752    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1753        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1754    }
1755
1756    /**
1757     * @return the sendTimeout (in milliseconds)
1758     */
1759    public int getSendTimeout() {
1760        return sendTimeout;
1761    }
1762
1763    /**
1764     * @param sendTimeout the sendTimeout to set (in milliseconds)
1765     */
1766    public void setSendTimeout(int sendTimeout) {
1767        this.sendTimeout = sendTimeout;
1768    }
1769
1770    /**
1771     * @return the sendAcksAsync
1772     */
1773    public boolean isSendAcksAsync() {
1774        return sendAcksAsync;
1775    }
1776
1777    /**
1778     * @param sendAcksAsync the sendAcksAsync to set
1779     */
1780    public void setSendAcksAsync(boolean sendAcksAsync) {
1781        this.sendAcksAsync = sendAcksAsync;
1782    }
1783
1784    /**
1785     * Returns the time this connection was created
1786     */
1787    public long getTimeCreated() {
1788        return timeCreated;
1789    }
1790
1791    private void waitForBrokerInfo() throws JMSException {
1792        try {
1793            brokerInfoReceived.await();
1794        } catch (InterruptedException e) {
1795            Thread.currentThread().interrupt();
1796            throw JMSExceptionSupport.create(e);
1797        }
1798    }
1799
1800    // Package protected so that it can be used in unit tests
1801    public Transport getTransport() {
1802        return transport;
1803    }
1804
1805    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1806        producers.put(producerId, producer);
1807    }
1808
1809    public void removeProducer(ProducerId producerId) {
1810        producers.remove(producerId);
1811    }
1812
1813    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1814        dispatchers.put(consumerId, dispatcher);
1815    }
1816
1817    public void removeDispatcher(ConsumerId consumerId) {
1818        dispatchers.remove(consumerId);
1819    }
1820
1821    public boolean hasDispatcher(ConsumerId consumerId) {
1822        return dispatchers.containsKey(consumerId);
1823    }
1824
1825    /**
1826     * @param o - the command to consume
1827     */
1828    @Override
1829    public void onCommand(final Object o) {
1830        final Command command = (Command)o;
1831        if (!closed.get() && command != null) {
1832            try {
1833                command.visit(new CommandVisitorAdapter() {
1834                    @Override
1835                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
1836                        waitForTransportInterruptionProcessingToComplete();
1837                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1838                        if (dispatcher != null) {
1839                            // Copy in case a embedded broker is dispatching via
1840                            // vm://
1841                            // md.getMessage() == null to signal end of queue
1842                            // browse.
1843                            Message msg = md.getMessage();
1844                            if (msg != null) {
1845                                msg = msg.copy();
1846                                msg.setReadOnlyBody(true);
1847                                msg.setReadOnlyProperties(true);
1848                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1849                                msg.setConnection(ActiveMQConnection.this);
1850                                msg.setMemoryUsage(null);
1851                                md.setMessage(msg);
1852                            }
1853                            dispatcher.dispatch(md);
1854                        } else {
1855                            LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
1856                        }
1857                        return null;
1858                    }
1859
1860                    @Override
1861                    public Response processProducerAck(ProducerAck pa) throws Exception {
1862                        if (pa != null && pa.getProducerId() != null) {
1863                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1864                            if (producer != null) {
1865                                producer.onProducerAck(pa);
1866                            }
1867                        }
1868                        return null;
1869                    }
1870
1871                    @Override
1872                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
1873                        brokerInfo = info;
1874                        brokerInfoReceived.countDown();
1875                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1876                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1877                        return null;
1878                    }
1879
1880                    @Override
1881                    public Response processConnectionError(final ConnectionError error) throws Exception {
1882                        executor.execute(new Runnable() {
1883                            @Override
1884                            public void run() {
1885                                onAsyncException(error.getException());
1886                            }
1887                        });
1888                        return null;
1889                    }
1890
1891                    @Override
1892                    public Response processControlCommand(ControlCommand command) throws Exception {
1893                        onControlCommand(command);
1894                        return null;
1895                    }
1896
1897                    @Override
1898                    public Response processConnectionControl(ConnectionControl control) throws Exception {
1899                        onConnectionControl((ConnectionControl)command);
1900                        return null;
1901                    }
1902
1903                    @Override
1904                    public Response processConsumerControl(ConsumerControl control) throws Exception {
1905                        onConsumerControl((ConsumerControl)command);
1906                        return null;
1907                    }
1908
1909                    @Override
1910                    public Response processWireFormat(WireFormatInfo info) throws Exception {
1911                        onWireFormatInfo((WireFormatInfo)command);
1912                        return null;
1913                    }
1914                });
1915            } catch (Exception e) {
1916                onClientInternalException(e);
1917            }
1918        }
1919
1920        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1921            TransportListener listener = iter.next();
1922            listener.onCommand(command);
1923        }
1924    }
1925
1926    protected void onWireFormatInfo(WireFormatInfo info) {
1927        protocolVersion.set(info.getVersion());
1928    }
1929
1930    /**
1931     * Handles async client internal exceptions.
1932     * A client internal exception is usually one that has been thrown
1933     * by a container runtime component during asynchronous processing of a
1934     * message that does not affect the connection itself.
1935     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1936     * its <code>onException</code> method, if one has been registered with this connection.
1937     *
1938     * @param error the exception that the problem
1939     */
1940    public void onClientInternalException(final Throwable error) {
1941        if ( !closed.get() && !closing.get() ) {
1942            if ( this.clientInternalExceptionListener != null ) {
1943                executor.execute(new Runnable() {
1944                    @Override
1945                    public void run() {
1946                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1947                    }
1948                });
1949            } else {
1950                LOG.debug("Async client internal exception occurred with no exception listener registered: "
1951                        + error, error);
1952            }
1953        }
1954    }
1955
1956    /**
1957     * Used for handling async exceptions
1958     *
1959     * @param error
1960     */
1961    public void onAsyncException(Throwable error) {
1962        if (!closed.get() && !closing.get()) {
1963            if (this.exceptionListener != null) {
1964
1965                if (!(error instanceof JMSException)) {
1966                    error = JMSExceptionSupport.create(error);
1967                }
1968                final JMSException e = (JMSException)error;
1969
1970                executor.execute(new Runnable() {
1971                    @Override
1972                    public void run() {
1973                        ActiveMQConnection.this.exceptionListener.onException(e);
1974                    }
1975                });
1976
1977            } else {
1978                LOG.debug("Async exception with no exception listener: " + error, error);
1979            }
1980        }
1981    }
1982
1983    @Override
1984    public void onException(final IOException error) {
1985        onAsyncException(error);
1986        if (!closing.get() && !closed.get()) {
1987            executor.execute(new Runnable() {
1988                @Override
1989                public void run() {
1990                    transportFailed(error);
1991                    ServiceSupport.dispose(ActiveMQConnection.this.transport);
1992                    brokerInfoReceived.countDown();
1993                    try {
1994                        doCleanup(true);
1995                    } catch (JMSException e) {
1996                        LOG.warn("Exception during connection cleanup, " + e, e);
1997                    }
1998                    for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1999                        TransportListener listener = iter.next();
2000                        listener.onException(error);
2001                    }
2002                }
2003            });
2004        }
2005    }
2006
2007    @Override
2008    public void transportInterupted() {
2009        transportInterruptionProcessingComplete.set(1);
2010        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2011            ActiveMQSession s = i.next();
2012            s.clearMessagesInProgress(transportInterruptionProcessingComplete);
2013        }
2014
2015        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
2016            connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
2017        }
2018
2019        if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
2020            if (LOG.isDebugEnabled()) {
2021                LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get());
2022            }
2023            signalInterruptionProcessingNeeded();
2024        }
2025
2026        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2027            TransportListener listener = iter.next();
2028            listener.transportInterupted();
2029        }
2030    }
2031
2032    @Override
2033    public void transportResumed() {
2034        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2035            TransportListener listener = iter.next();
2036            listener.transportResumed();
2037        }
2038    }
2039
2040    /**
2041     * Create the DestinationInfo object for the temporary destination.
2042     *
2043     * @param topic - if its true topic, else queue.
2044     * @return DestinationInfo
2045     * @throws JMSException
2046     */
2047    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2048
2049        // Check if Destination info is of temporary type.
2050        ActiveMQTempDestination dest;
2051        if (topic) {
2052            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2053        } else {
2054            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2055        }
2056
2057        DestinationInfo info = new DestinationInfo();
2058        info.setConnectionId(this.info.getConnectionId());
2059        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2060        info.setDestination(dest);
2061        syncSendPacket(info);
2062
2063        dest.setConnection(this);
2064        activeTempDestinations.put(dest, dest);
2065        return dest;
2066    }
2067
2068    /**
2069     * @param destination
2070     * @throws JMSException
2071     */
2072    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2073
2074        checkClosedOrFailed();
2075
2076        for (ActiveMQSession session : this.sessions) {
2077            if (session.isInUse(destination)) {
2078                throw new JMSException("A consumer is consuming from the temporary destination");
2079            }
2080        }
2081
2082        activeTempDestinations.remove(destination);
2083
2084        DestinationInfo destInfo = new DestinationInfo();
2085        destInfo.setConnectionId(this.info.getConnectionId());
2086        destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2087        destInfo.setDestination(destination);
2088        destInfo.setTimeout(0);
2089        syncSendPacket(destInfo);
2090    }
2091
2092    public boolean isDeleted(ActiveMQDestination dest) {
2093
2094        // If we are not watching the advisories.. then
2095        // we will assume that the temp destination does exist.
2096        if (advisoryConsumer == null) {
2097            return false;
2098        }
2099
2100        return !activeTempDestinations.containsValue(dest);
2101    }
2102
2103    public boolean isCopyMessageOnSend() {
2104        return copyMessageOnSend;
2105    }
2106
2107    public LongSequenceGenerator getLocalTransactionIdGenerator() {
2108        return localTransactionIdGenerator;
2109    }
2110
2111    public boolean isUseCompression() {
2112        return useCompression;
2113    }
2114
2115    /**
2116     * Enables the use of compression of the message bodies
2117     */
2118    public void setUseCompression(boolean useCompression) {
2119        this.useCompression = useCompression;
2120    }
2121
2122    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2123
2124        checkClosedOrFailed();
2125        ensureConnectionInfoSent();
2126
2127        DestinationInfo info = new DestinationInfo();
2128        info.setConnectionId(this.info.getConnectionId());
2129        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2130        info.setDestination(destination);
2131        info.setTimeout(0);
2132        syncSendPacket(info);
2133    }
2134
2135    public boolean isDispatchAsync() {
2136        return dispatchAsync;
2137    }
2138
2139    /**
2140     * Enables or disables the default setting of whether or not consumers have
2141     * their messages <a
2142     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2143     * synchronously or asynchronously by the broker</a>. For non-durable
2144     * topics for example we typically dispatch synchronously by default to
2145     * minimize context switches which boost performance. However sometimes its
2146     * better to go slower to ensure that a single blocked consumer socket does
2147     * not block delivery to other consumers.
2148     *
2149     * @param asyncDispatch If true then consumers created on this connection
2150     *                will default to having their messages dispatched
2151     *                asynchronously. The default value is true.
2152     */
2153    public void setDispatchAsync(boolean asyncDispatch) {
2154        this.dispatchAsync = asyncDispatch;
2155    }
2156
2157    public boolean isObjectMessageSerializationDefered() {
2158        return objectMessageSerializationDefered;
2159    }
2160
2161    /**
2162     * When an object is set on an ObjectMessage, the JMS spec requires the
2163     * object to be serialized by that set method. Enabling this flag causes the
2164     * object to not get serialized. The object may subsequently get serialized
2165     * if the message needs to be sent over a socket or stored to disk.
2166     */
2167    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2168        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2169    }
2170
2171    /**
2172     * Unsubscribes a durable subscription that has been created by a client.
2173     * <P>
2174     * This method deletes the state being maintained on behalf of the
2175     * subscriber by its provider.
2176     * <P>
2177     * It is erroneous for a client to delete a durable subscription while there
2178     * is an active <CODE>MessageConsumer </CODE> or
2179     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2180     * message is part of a pending transaction or has not been acknowledged in
2181     * the session.
2182     *
2183     * @param name the name used to identify this subscription
2184     * @throws JMSException if the session fails to unsubscribe to the durable
2185     *                 subscription due to some internal error.
2186     * @throws InvalidDestinationException if an invalid subscription name is
2187     *                 specified.
2188     * @since 1.1
2189     */
2190    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2191        checkClosedOrFailed();
2192        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2193        rsi.setConnectionId(getConnectionInfo().getConnectionId());
2194        rsi.setSubscriptionName(name);
2195        rsi.setClientId(getConnectionInfo().getClientId());
2196        syncSendPacket(rsi);
2197    }
2198
2199    /**
2200     * Internal send method optimized: - It does not copy the message - It can
2201     * only handle ActiveMQ messages. - You can specify if the send is async or
2202     * sync - Does not allow you to send /w a transaction.
2203     */
2204    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2205        checkClosedOrFailed();
2206
2207        if (destination.isTemporary() && isDeleted(destination)) {
2208            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2209        }
2210
2211        msg.setJMSDestination(destination);
2212        msg.setJMSDeliveryMode(deliveryMode);
2213        long expiration = 0L;
2214
2215        if (!isDisableTimeStampsByDefault()) {
2216            long timeStamp = System.currentTimeMillis();
2217            msg.setJMSTimestamp(timeStamp);
2218            if (timeToLive > 0) {
2219                expiration = timeToLive + timeStamp;
2220            }
2221        }
2222
2223        msg.setJMSExpiration(expiration);
2224        msg.setJMSPriority(priority);
2225        msg.setJMSRedelivered(false);
2226        msg.setMessageId(messageId);
2227        msg.onSend();
2228        msg.setProducerId(msg.getMessageId().getProducerId());
2229
2230        if (LOG.isDebugEnabled()) {
2231            LOG.debug("Sending message: " + msg);
2232        }
2233
2234        if (async) {
2235            asyncSendPacket(msg);
2236        } else {
2237            syncSendPacket(msg);
2238        }
2239    }
2240
2241    protected void onControlCommand(ControlCommand command) {
2242        String text = command.getCommand();
2243        if (text != null) {
2244            if ("shutdown".equals(text)) {
2245                LOG.info("JVM told to shutdown");
2246                System.exit(0);
2247            }
2248
2249            // TODO Should we handle the "close" case?
2250            // if (false && "close".equals(text)){
2251            //     LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2252            //     try {
2253            //         close();
2254            //     } catch (JMSException e) {
2255            //     }
2256            // }
2257        }
2258    }
2259
2260    protected void onConnectionControl(ConnectionControl command) {
2261        if (command.isFaultTolerant()) {
2262            this.optimizeAcknowledge = false;
2263            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2264                ActiveMQSession s = i.next();
2265                s.setOptimizeAcknowledge(false);
2266            }
2267        }
2268    }
2269
2270    protected void onConsumerControl(ConsumerControl command) {
2271        if (command.isClose()) {
2272            for (ActiveMQSession session : this.sessions) {
2273                session.close(command.getConsumerId());
2274            }
2275        } else {
2276            for (ActiveMQSession session : this.sessions) {
2277                session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2278            }
2279            for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
2280                ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
2281                if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
2282                    consumerInfo.setPrefetchSize(command.getPrefetch());
2283                }
2284            }
2285        }
2286    }
2287
2288    protected void transportFailed(IOException error) {
2289        transportFailed.set(true);
2290        if (firstFailureError == null) {
2291            firstFailureError = error;
2292        }
2293    }
2294
2295    /**
2296     * Should a JMS message be copied to a new JMS Message object as part of the
2297     * send() method in JMS. This is enabled by default to be compliant with the
2298     * JMS specification. You can disable it if you do not mutate JMS messages
2299     * after they are sent for a performance boost
2300     */
2301    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2302        this.copyMessageOnSend = copyMessageOnSend;
2303    }
2304
2305    @Override
2306    public String toString() {
2307        return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2308    }
2309
2310    protected BlobTransferPolicy createBlobTransferPolicy() {
2311        return new BlobTransferPolicy();
2312    }
2313
2314    public int getProtocolVersion() {
2315        return protocolVersion.get();
2316    }
2317
2318    public int getProducerWindowSize() {
2319        return producerWindowSize;
2320    }
2321
2322    public void setProducerWindowSize(int producerWindowSize) {
2323        this.producerWindowSize = producerWindowSize;
2324    }
2325
2326    public void setAuditDepth(int auditDepth) {
2327        connectionAudit.setAuditDepth(auditDepth);
2328    }
2329
2330    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2331        connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2332    }
2333
2334    protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2335        connectionAudit.removeDispatcher(dispatcher);
2336    }
2337
2338    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2339        return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2340    }
2341
2342    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2343        connectionAudit.rollbackDuplicate(dispatcher, message);
2344    }
2345
2346    public IOException getFirstFailureError() {
2347        return firstFailureError;
2348    }
2349
2350    protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2351        if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) {
2352            LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get());
2353            signalInterruptionProcessingComplete();
2354        }
2355    }
2356
2357    protected void transportInterruptionProcessingComplete() {
2358        if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
2359            signalInterruptionProcessingComplete();
2360        }
2361    }
2362
2363    private void signalInterruptionProcessingComplete() {
2364            if (LOG.isDebugEnabled()) {
2365                LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
2366                        + " for:" + this.getConnectionInfo().getConnectionId());
2367            }
2368
2369            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2370            if (failoverTransport != null) {
2371                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2372                if (LOG.isDebugEnabled()) {
2373                    LOG.debug("notified failover transport (" + failoverTransport
2374                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2375                }
2376            }
2377            transportInterruptionProcessingComplete.set(0);
2378    }
2379
2380    private void signalInterruptionProcessingNeeded() {
2381        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2382        if (failoverTransport != null) {
2383            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2384            if (LOG.isDebugEnabled()) {
2385                LOG.debug("notified failover transport (" + failoverTransport
2386                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2387            }
2388        }
2389    }
2390
2391    /*
2392     * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2393     * will wait to receive re dispatched messages.
2394     * default value is 0 so there is no wait by default.
2395     */
2396    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2397        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2398    }
2399
2400    public long getConsumerFailoverRedeliveryWaitPeriod() {
2401        return consumerFailoverRedeliveryWaitPeriod;
2402    }
2403
2404    protected Scheduler getScheduler() throws JMSException {
2405        Scheduler result = scheduler;
2406        if (result == null) {
2407            if (isClosing() || isClosed()) {
2408                // without lock contention report the closing state
2409                throw new ConnectionClosedException();
2410            }
2411            synchronized (this) {
2412                result = scheduler;
2413                if (result == null) {
2414                    checkClosed();
2415                    try {
2416                        result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2417                        result.start();
2418                        scheduler = result;
2419                    } catch(Exception e) {
2420                        throw JMSExceptionSupport.create(e);
2421                    }
2422                }
2423            }
2424        }
2425        return result;
2426    }
2427
2428    protected ThreadPoolExecutor getExecutor() {
2429        return this.executor;
2430    }
2431
2432    protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
2433        return sessions;
2434    }
2435
2436    /**
2437     * @return the checkForDuplicates
2438     */
2439    public boolean isCheckForDuplicates() {
2440        return this.checkForDuplicates;
2441    }
2442
2443    /**
2444     * @param checkForDuplicates the checkForDuplicates to set
2445     */
2446    public void setCheckForDuplicates(boolean checkForDuplicates) {
2447        this.checkForDuplicates = checkForDuplicates;
2448    }
2449
2450    public boolean isTransactedIndividualAck() {
2451        return transactedIndividualAck;
2452    }
2453
2454    public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2455        this.transactedIndividualAck = transactedIndividualAck;
2456    }
2457
2458    public boolean isNonBlockingRedelivery() {
2459        return nonBlockingRedelivery;
2460    }
2461
2462    public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2463        this.nonBlockingRedelivery = nonBlockingRedelivery;
2464    }
2465
2466    public boolean isRmIdFromConnectionId() {
2467        return rmIdFromConnectionId;
2468    }
2469
2470    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
2471        this.rmIdFromConnectionId = rmIdFromConnectionId;
2472    }
2473
2474    /**
2475     * Removes any TempDestinations that this connection has cached, ignoring
2476     * any exceptions generated because the destination is in use as they should
2477     * not be removed.
2478     * Used from a pooled connection, b/c it will not be explicitly closed.
2479     */
2480    public void cleanUpTempDestinations() {
2481
2482        if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2483            return;
2484        }
2485
2486        Iterator<ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2487            = this.activeTempDestinations.entrySet().iterator();
2488        while(entries.hasNext()) {
2489            ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2490            try {
2491                // Only delete this temp destination if it was created from this connection. The connection used
2492                // for the advisory consumer may also have a reference to this temp destination.
2493                ActiveMQTempDestination dest = entry.getValue();
2494                String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2495                if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2496                    this.deleteTempDestination(entry.getValue());
2497                }
2498            } catch (Exception ex) {
2499                // the temp dest is in use so it can not be deleted.
2500                // it is ok to leave it to connection tear down phase
2501            }
2502        }
2503    }
2504
2505    /**
2506     * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2507     * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2508     */
2509    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2510        this.redeliveryPolicyMap = redeliveryPolicyMap;
2511    }
2512
2513    /**
2514     * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2515     * Consumers when dealing with transaction messages that have been rolled back.
2516     *
2517     * @return the redeliveryPolicyMap
2518     */
2519    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2520        return redeliveryPolicyMap;
2521    }
2522
2523    public int getMaxThreadPoolSize() {
2524        return maxThreadPoolSize;
2525    }
2526
2527    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2528        this.maxThreadPoolSize = maxThreadPoolSize;
2529    }
2530
2531    /**
2532     * Enable enforcement of QueueConnection semantics.
2533     *
2534     * @return this object, useful for chaining
2535     */
2536    ActiveMQConnection enforceQueueOnlyConnection() {
2537        this.queueOnlyConnection = true;
2538        return this;
2539    }
2540
2541    public RejectedExecutionHandler getRejectedTaskHandler() {
2542        return rejectedTaskHandler;
2543    }
2544
2545    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2546        this.rejectedTaskHandler = rejectedTaskHandler;
2547    }
2548
2549    /**
2550     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2551     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
2552     * will not do any background Message acknowledgment.
2553     *
2554     * @return the scheduledOptimizedAckInterval
2555     */
2556    public long getOptimizedAckScheduledAckInterval() {
2557        return optimizedAckScheduledAckInterval;
2558    }
2559
2560    /**
2561     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2562     * have been configured with optimizeAcknowledge enabled.
2563     *
2564     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
2565     */
2566    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2567        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2568    }
2569
2570    /**
2571     * @return true if MessageConsumer instance will check for expired messages before dispatch.
2572     */
2573    public boolean isConsumerExpiryCheckEnabled() {
2574        return consumerExpiryCheckEnabled;
2575    }
2576
2577    /**
2578     * Controls whether message expiration checking is done in each MessageConsumer
2579     * prior to dispatching a message.  Disabling this check can lead to consumption
2580     * of expired messages.
2581     *
2582     * @param consumerExpiryCheckEnabled
2583     *        controls whether expiration checking is done prior to dispatch.
2584     */
2585    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
2586        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
2587    }
2588}