001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.CopyOnWriteArrayList;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.LinkedBlockingQueue;
032import java.util.concurrent.RejectedExecutionHandler;
033import java.util.concurrent.ThreadFactory;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicInteger;
038
039import javax.jms.Connection;
040import javax.jms.ConnectionConsumer;
041import javax.jms.ConnectionMetaData;
042import javax.jms.Destination;
043import javax.jms.ExceptionListener;
044import javax.jms.IllegalStateException;
045import javax.jms.InvalidDestinationException;
046import javax.jms.JMSException;
047import javax.jms.Queue;
048import javax.jms.QueueConnection;
049import javax.jms.QueueSession;
050import javax.jms.ServerSessionPool;
051import javax.jms.Session;
052import javax.jms.Topic;
053import javax.jms.TopicConnection;
054import javax.jms.TopicSession;
055import javax.jms.XAConnection;
056
057import org.apache.activemq.advisory.DestinationSource;
058import org.apache.activemq.blob.BlobTransferPolicy;
059import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
060import org.apache.activemq.command.ActiveMQDestination;
061import org.apache.activemq.command.ActiveMQMessage;
062import org.apache.activemq.command.ActiveMQTempDestination;
063import org.apache.activemq.command.ActiveMQTempQueue;
064import org.apache.activemq.command.ActiveMQTempTopic;
065import org.apache.activemq.command.BrokerInfo;
066import org.apache.activemq.command.Command;
067import org.apache.activemq.command.CommandTypes;
068import org.apache.activemq.command.ConnectionControl;
069import org.apache.activemq.command.ConnectionError;
070import org.apache.activemq.command.ConnectionId;
071import org.apache.activemq.command.ConnectionInfo;
072import org.apache.activemq.command.ConsumerControl;
073import org.apache.activemq.command.ConsumerId;
074import org.apache.activemq.command.ConsumerInfo;
075import org.apache.activemq.command.ControlCommand;
076import org.apache.activemq.command.DestinationInfo;
077import org.apache.activemq.command.ExceptionResponse;
078import org.apache.activemq.command.Message;
079import org.apache.activemq.command.MessageDispatch;
080import org.apache.activemq.command.MessageId;
081import org.apache.activemq.command.ProducerAck;
082import org.apache.activemq.command.ProducerId;
083import org.apache.activemq.command.RemoveInfo;
084import org.apache.activemq.command.RemoveSubscriptionInfo;
085import org.apache.activemq.command.Response;
086import org.apache.activemq.command.SessionId;
087import org.apache.activemq.command.ShutdownInfo;
088import org.apache.activemq.command.WireFormatInfo;
089import org.apache.activemq.management.JMSConnectionStatsImpl;
090import org.apache.activemq.management.JMSStatsImpl;
091import org.apache.activemq.management.StatsCapable;
092import org.apache.activemq.management.StatsImpl;
093import org.apache.activemq.state.CommandVisitorAdapter;
094import org.apache.activemq.thread.Scheduler;
095import org.apache.activemq.thread.TaskRunnerFactory;
096import org.apache.activemq.transport.FutureResponse;
097import org.apache.activemq.transport.RequestTimedOutIOException;
098import org.apache.activemq.transport.ResponseCallback;
099import org.apache.activemq.transport.Transport;
100import org.apache.activemq.transport.TransportListener;
101import org.apache.activemq.transport.failover.FailoverTransport;
102import org.apache.activemq.util.IdGenerator;
103import org.apache.activemq.util.IntrospectionSupport;
104import org.apache.activemq.util.JMSExceptionSupport;
105import org.apache.activemq.util.LongSequenceGenerator;
106import org.apache.activemq.util.ServiceSupport;
107import org.apache.activemq.util.ThreadPoolUtils;
108import org.slf4j.Logger;
109import org.slf4j.LoggerFactory;
110
111public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
112
113    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
114    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
115    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
116    public static int DEFAULT_THREAD_POOL_SIZE = 1000;
117
118    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
119
120    public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
121
122    protected boolean dispatchAsync=true;
123    protected boolean alwaysSessionAsync = true;
124
125    private TaskRunnerFactory sessionTaskRunner;
126    private final ThreadPoolExecutor executor;
127
128    // Connection state variables
129    private final ConnectionInfo info;
130    private ExceptionListener exceptionListener;
131    private ClientInternalExceptionListener clientInternalExceptionListener;
132    private boolean clientIDSet;
133    private boolean isConnectionInfoSentToBroker;
134    private boolean userSpecifiedClientID;
135
136    // Configuration options variables
137    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
138    private BlobTransferPolicy blobTransferPolicy;
139    private RedeliveryPolicyMap redeliveryPolicyMap;
140    private MessageTransformer transformer;
141
142    private boolean disableTimeStampsByDefault;
143    private boolean optimizedMessageDispatch = true;
144    private boolean copyMessageOnSend = true;
145    private boolean useCompression;
146    private boolean objectMessageSerializationDefered;
147    private boolean useAsyncSend;
148    private boolean optimizeAcknowledge;
149    private long optimizeAcknowledgeTimeOut = 0;
150    private long optimizedAckScheduledAckInterval = 0;
151    private boolean nestedMapAndListEnabled = true;
152    private boolean useRetroactiveConsumer;
153    private boolean exclusiveConsumer;
154    private boolean alwaysSyncSend;
155    private int closeTimeout = 15000;
156    private boolean watchTopicAdvisories = true;
157    private long warnAboutUnstartedConnectionTimeout = 500L;
158    private int sendTimeout =0;
159    private boolean sendAcksAsync=true;
160    private boolean checkForDuplicates = true;
161    private boolean queueOnlyConnection = false;
162    private boolean consumerExpiryCheckEnabled = true;
163
164    private final Transport transport;
165    private final IdGenerator clientIdGenerator;
166    private final JMSStatsImpl factoryStats;
167    private final JMSConnectionStatsImpl stats;
168
169    private final AtomicBoolean started = new AtomicBoolean(false);
170    private final AtomicBoolean closing = new AtomicBoolean(false);
171    private final AtomicBoolean closed = new AtomicBoolean(false);
172    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
173    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
174    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
175    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
176
177    // Maps ConsumerIds to ActiveMQConsumer objects
178    private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
179    private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
180    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
181    private final SessionId connectionSessionId;
182    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
183    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
184    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
185
186    private AdvisoryConsumer advisoryConsumer;
187    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
188    private BrokerInfo brokerInfo;
189    private IOException firstFailureError;
190    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
191
192    // Assume that protocol is the latest. Change to the actual protocol
193    // version when a WireFormatInfo is received.
194    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
195    private final long timeCreated;
196    private final ConnectionAudit connectionAudit = new ConnectionAudit();
197    private DestinationSource destinationSource;
198    private final Object ensureConnectionInfoSentMutex = new Object();
199    private boolean useDedicatedTaskRunner;
200    protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
201    private long consumerFailoverRedeliveryWaitPeriod;
202    private Scheduler scheduler;
203    private boolean messagePrioritySupported = false;
204    private boolean transactedIndividualAck = false;
205    private boolean nonBlockingRedelivery = false;
206    private boolean rmIdFromConnectionId = false;
207
208    private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
209    private RejectedExecutionHandler rejectedTaskHandler = null;
210
211    private List<String> trustedPackages = new ArrayList<String>();
212    private boolean trustAllPackages = false;
213        private int connectResponseTimeout;
214
215    /**
216     * Construct an <code>ActiveMQConnection</code>
217     *
218     * @param transport
219     * @param factoryStats
220     * @throws Exception
221     */
222    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
223
224        this.transport = transport;
225        this.clientIdGenerator = clientIdGenerator;
226        this.factoryStats = factoryStats;
227
228        // Configure a single threaded executor who's core thread can timeout if
229        // idle
230        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
231            @Override
232            public Thread newThread(Runnable r) {
233                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
234                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
235                //thread.setDaemon(true);
236                return thread;
237            }
238        });
239        // asyncConnectionThread.allowCoreThreadTimeOut(true);
240        String uniqueId = connectionIdGenerator.generateId();
241        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
242        this.info.setManageable(true);
243        this.info.setFaultTolerant(transport.isFaultTolerant());
244        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
245
246        this.transport.setTransportListener(this);
247
248        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
249        this.factoryStats.addConnection(this);
250        this.timeCreated = System.currentTimeMillis();
251        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
252    }
253
254    protected void setUserName(String userName) {
255        this.info.setUserName(userName);
256    }
257
258    protected void setPassword(String password) {
259        this.info.setPassword(password);
260    }
261
262    /**
263     * A static helper method to create a new connection
264     *
265     * @return an ActiveMQConnection
266     * @throws JMSException
267     */
268    public static ActiveMQConnection makeConnection() throws JMSException {
269        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
270        return (ActiveMQConnection)factory.createConnection();
271    }
272
273    /**
274     * A static helper method to create a new connection
275     *
276     * @param uri
277     * @return and ActiveMQConnection
278     * @throws JMSException
279     */
280    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
281        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
282        return (ActiveMQConnection)factory.createConnection();
283    }
284
285    /**
286     * A static helper method to create a new connection
287     *
288     * @param user
289     * @param password
290     * @param uri
291     * @return an ActiveMQConnection
292     * @throws JMSException
293     */
294    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
295        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
296        return (ActiveMQConnection)factory.createConnection();
297    }
298
299    /**
300     * @return a number unique for this connection
301     */
302    public JMSConnectionStatsImpl getConnectionStats() {
303        return stats;
304    }
305
306    /**
307     * Creates a <CODE>Session</CODE> object.
308     *
309     * @param transacted indicates whether the session is transacted
310     * @param acknowledgeMode indicates whether the consumer or the client will
311     *                acknowledge any messages it receives; ignored if the
312     *                session is transacted. Legal values are
313     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
314     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
315     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
316     * @return a newly created session
317     * @throws JMSException if the <CODE>Connection</CODE> object fails to
318     *                 create a session due to some internal error or lack of
319     *                 support for the specific transaction and acknowledgement
320     *                 mode.
321     * @see Session#AUTO_ACKNOWLEDGE
322     * @see Session#CLIENT_ACKNOWLEDGE
323     * @see Session#DUPS_OK_ACKNOWLEDGE
324     * @since 1.1
325     */
326    @Override
327    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
328        checkClosedOrFailed();
329        ensureConnectionInfoSent();
330        if (!transacted) {
331            if (acknowledgeMode == Session.SESSION_TRANSACTED) {
332                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
333            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
334                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
335                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
336            }
337        }
338        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : acknowledgeMode, isDispatchAsync(), isAlwaysSessionAsync());
339    }
340
341    /**
342     * @return sessionId
343     */
344    protected SessionId getNextSessionId() {
345        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
346    }
347
348    /**
349     * Gets the client identifier for this connection.
350     * <P>
351     * This value is specific to the JMS provider. It is either preconfigured by
352     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
353     * dynamically by the application by calling the <code>setClientID</code>
354     * method.
355     *
356     * @return the unique client identifier
357     * @throws JMSException if the JMS provider fails to return the client ID
358     *                 for this connection due to some internal error.
359     */
360    @Override
361    public String getClientID() throws JMSException {
362        checkClosedOrFailed();
363        return this.info.getClientId();
364    }
365
366    /**
367     * Sets the client identifier for this connection.
368     * <P>
369     * The preferred way to assign a JMS client's client identifier is for it to
370     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
371     * object and transparently assigned to the <CODE>Connection</CODE> object
372     * it creates.
373     * <P>
374     * Alternatively, a client can set a connection's client identifier using a
375     * provider-specific value. The facility to set a connection's client
376     * identifier explicitly is not a mechanism for overriding the identifier
377     * that has been administratively configured. It is provided for the case
378     * where no administratively specified identifier exists. If one does exist,
379     * an attempt to change it by setting it must throw an
380     * <CODE>IllegalStateException</CODE>. If a client sets the client
381     * identifier explicitly, it must do so immediately after it creates the
382     * connection and before any other action on the connection is taken. After
383     * this point, setting the client identifier is a programming error that
384     * should throw an <CODE>IllegalStateException</CODE>.
385     * <P>
386     * The purpose of the client identifier is to associate a connection and its
387     * objects with a state maintained on behalf of the client by a provider.
388     * The only such state identified by the JMS API is that required to support
389     * durable subscriptions.
390     * <P>
391     * If another connection with the same <code>clientID</code> is already
392     * running when this method is called, the JMS provider should detect the
393     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
394     *
395     * @param newClientID the unique client identifier
396     * @throws JMSException if the JMS provider fails to set the client ID for
397     *                 this connection due to some internal error.
398     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
399     *                 invalid or duplicate client ID.
400     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
401     *                 a connection's client ID at the wrong time or when it has
402     *                 been administratively configured.
403     */
404    @Override
405    public void setClientID(String newClientID) throws JMSException {
406        checkClosedOrFailed();
407
408        if (this.clientIDSet) {
409            throw new IllegalStateException("The clientID has already been set");
410        }
411
412        if (this.isConnectionInfoSentToBroker) {
413            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
414        }
415
416        this.info.setClientId(newClientID);
417        this.userSpecifiedClientID = true;
418        ensureConnectionInfoSent();
419    }
420
421    /**
422     * Sets the default client id that the connection will use if explicitly not
423     * set with the setClientId() call.
424     */
425    public void setDefaultClientID(String clientID) throws JMSException {
426        this.info.setClientId(clientID);
427        this.userSpecifiedClientID = true;
428    }
429
430    /**
431     * Gets the metadata for this connection.
432     *
433     * @return the connection metadata
434     * @throws JMSException if the JMS provider fails to get the connection
435     *                 metadata for this connection.
436     * @see javax.jms.ConnectionMetaData
437     */
438    @Override
439    public ConnectionMetaData getMetaData() throws JMSException {
440        checkClosedOrFailed();
441        return ActiveMQConnectionMetaData.INSTANCE;
442    }
443
444    /**
445     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
446     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
447     * associated with it.
448     *
449     * @return the <CODE>ExceptionListener</CODE> for this connection, or
450     *         null, if no <CODE>ExceptionListener</CODE> is associated with
451     *         this connection.
452     * @throws JMSException if the JMS provider fails to get the
453     *                 <CODE>ExceptionListener</CODE> for this connection.
454     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
455     */
456    @Override
457    public ExceptionListener getExceptionListener() throws JMSException {
458        checkClosedOrFailed();
459        return this.exceptionListener;
460    }
461
462    /**
463     * Sets an exception listener for this connection.
464     * <P>
465     * If a JMS provider detects a serious problem with a connection, it informs
466     * the connection's <CODE> ExceptionListener</CODE>, if one has been
467     * registered. It does this by calling the listener's <CODE>onException
468     * </CODE>
469     * method, passing it a <CODE>JMSException</CODE> object describing the
470     * problem.
471     * <P>
472     * An exception listener allows a client to be notified of a problem
473     * asynchronously. Some connections only consume messages, so they would
474     * have no other way to learn their connection has failed.
475     * <P>
476     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
477     * <P>
478     * A JMS provider should attempt to resolve connection problems itself
479     * before it notifies the client of them.
480     *
481     * @param listener the exception listener
482     * @throws JMSException if the JMS provider fails to set the exception
483     *                 listener for this connection.
484     */
485    @Override
486    public void setExceptionListener(ExceptionListener listener) throws JMSException {
487        checkClosedOrFailed();
488        this.exceptionListener = listener;
489    }
490
491    /**
492     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
493     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
494     * associated with it.
495     *
496     * @return the listener or <code>null</code> if no listener is registered with the connection.
497     */
498    public ClientInternalExceptionListener getClientInternalExceptionListener() {
499        return clientInternalExceptionListener;
500    }
501
502    /**
503     * Sets a client internal exception listener for this connection.
504     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
505     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
506     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
507     * describing the problem.
508     *
509     * @param listener the exception listener
510     */
511    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
512        this.clientInternalExceptionListener = listener;
513    }
514
515    /**
516     * Starts (or restarts) a connection's delivery of incoming messages. A call
517     * to <CODE>start</CODE> on a connection that has already been started is
518     * ignored.
519     *
520     * @throws JMSException if the JMS provider fails to start message delivery
521     *                 due to some internal error.
522     * @see javax.jms.Connection#stop()
523     */
524    @Override
525    public void start() throws JMSException {
526        checkClosedOrFailed();
527        ensureConnectionInfoSent();
528        if (started.compareAndSet(false, true)) {
529            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
530                ActiveMQSession session = i.next();
531                session.start();
532            }
533        }
534    }
535
536    /**
537     * Temporarily stops a connection's delivery of incoming messages. Delivery
538     * can be restarted using the connection's <CODE>start</CODE> method. When
539     * the connection is stopped, delivery to all the connection's message
540     * consumers is inhibited: synchronous receives block, and messages are not
541     * delivered to message listeners.
542     * <P>
543     * This call blocks until receives and/or message listeners in progress have
544     * completed.
545     * <P>
546     * Stopping a connection has no effect on its ability to send messages. A
547     * call to <CODE>stop</CODE> on a connection that has already been stopped
548     * is ignored.
549     * <P>
550     * A call to <CODE>stop</CODE> must not return until delivery of messages
551     * has paused. This means that a client can rely on the fact that none of
552     * its message listeners will be called and that all threads of control
553     * waiting for <CODE>receive</CODE> calls to return will not return with a
554     * message until the connection is restarted. The receive timers for a
555     * stopped connection continue to advance, so receives may time out while
556     * the connection is stopped.
557     * <P>
558     * If message listeners are running when <CODE>stop</CODE> is invoked, the
559     * <CODE>stop</CODE> call must wait until all of them have returned before
560     * it may return. While these message listeners are completing, they must
561     * have the full services of the connection available to them.
562     *
563     * @throws JMSException if the JMS provider fails to stop message delivery
564     *                 due to some internal error.
565     * @see javax.jms.Connection#start()
566     */
567    @Override
568    public void stop() throws JMSException {
569        doStop(true);
570    }
571
572    /**
573     * @see #stop()
574     * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed,
575     *                    <tt>false</tt> to skip this check
576     * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
577     */
578    void doStop(boolean checkClosed) throws JMSException {
579        if (checkClosed) {
580            checkClosedOrFailed();
581        }
582        if (started.compareAndSet(true, false)) {
583            synchronized(sessions) {
584                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
585                    ActiveMQSession s = i.next();
586                    s.stop();
587                }
588            }
589        }
590    }
591
592    /**
593     * Closes the connection.
594     * <P>
595     * Since a provider typically allocates significant resources outside the
596     * JVM on behalf of a connection, clients should close these resources when
597     * they are not needed. Relying on garbage collection to eventually reclaim
598     * these resources may not be timely enough.
599     * <P>
600     * There is no need to close the sessions, producers, and consumers of a
601     * closed connection.
602     * <P>
603     * Closing a connection causes all temporary destinations to be deleted.
604     * <P>
605     * When this method is invoked, it should not return until message
606     * processing has been shut down in an orderly fashion. This means that all
607     * message listeners that may have been running have returned, and that all
608     * pending receives have returned. A close terminates all pending message
609     * receives on the connection's sessions' consumers. The receives may return
610     * with a message or with null, depending on whether there was a message
611     * available at the time of the close. If one or more of the connection's
612     * sessions' message listeners is processing a message at the time when
613     * connection <CODE>close</CODE> is invoked, all the facilities of the
614     * connection and its sessions must remain available to those listeners
615     * until they return control to the JMS provider.
616     * <P>
617     * Closing a connection causes any of its sessions' transactions in progress
618     * to be rolled back. In the case where a session's work is coordinated by
619     * an external transaction manager, a session's <CODE>commit</CODE> and
620     * <CODE> rollback</CODE> methods are not used and the result of a closed
621     * session's work is determined later by the transaction manager. Closing a
622     * connection does NOT force an acknowledgment of client-acknowledged
623     * sessions.
624     * <P>
625     * Invoking the <CODE>acknowledge</CODE> method of a received message from
626     * a closed connection's session must throw an
627     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
628     * NOT throw an exception.
629     *
630     * @throws JMSException if the JMS provider fails to close the connection
631     *                 due to some internal error. For example, a failure to
632     *                 release resources or to close a socket connection can
633     *                 cause this exception to be thrown.
634     */
635    @Override
636    public void close() throws JMSException {
637        try {
638            // If we were running, lets stop first.
639            if (!closed.get() && !transportFailed.get()) {
640                // do not fail if already closed as according to JMS spec we must not
641                // throw exception if already closed
642                doStop(false);
643            }
644
645            synchronized (this) {
646                if (!closed.get()) {
647                    closing.set(true);
648
649                    if (destinationSource != null) {
650                        destinationSource.stop();
651                        destinationSource = null;
652                    }
653                    if (advisoryConsumer != null) {
654                        advisoryConsumer.dispose();
655                        advisoryConsumer = null;
656                    }
657
658                    Scheduler scheduler = this.scheduler;
659                    if (scheduler != null) {
660                        try {
661                            scheduler.stop();
662                        } catch (Exception e) {
663                            JMSException ex =  JMSExceptionSupport.create(e);
664                            throw ex;
665                        }
666                    }
667
668                    long lastDeliveredSequenceId = -1;
669                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
670                        ActiveMQSession s = i.next();
671                        s.dispose();
672                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
673                    }
674                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
675                        ActiveMQConnectionConsumer c = i.next();
676                        c.dispose();
677                    }
678
679                    this.activeTempDestinations.clear();
680
681                    try {
682                        if (isConnectionInfoSentToBroker) {
683                            // If we announced ourselves to the broker.. Try to let the broker
684                            // know that the connection is being shutdown.
685                            RemoveInfo removeCommand = info.createRemoveCommand();
686                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
687                            try {
688                                syncSendPacket(removeCommand, closeTimeout);
689                            } catch (JMSException e) {
690                                if (e.getCause() instanceof RequestTimedOutIOException) {
691                                    // expected
692                                } else {
693                                    throw e;
694                                }
695                            }
696                            doAsyncSendPacket(new ShutdownInfo());
697                        }
698                    } finally { // release anyway even if previous communication fails
699                        started.set(false);
700
701                        // TODO if we move the TaskRunnerFactory to the connection
702                        // factory
703                        // then we may need to call
704                        // factory.onConnectionClose(this);
705                        if (sessionTaskRunner != null) {
706                            sessionTaskRunner.shutdown();
707                        }
708                        closed.set(true);
709                        closing.set(false);
710                    }
711                }
712            }
713        } finally {
714            try {
715                if (executor != null) {
716                    ThreadPoolUtils.shutdown(executor);
717                }
718            } catch (Throwable e) {
719                LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
720            }
721
722            ServiceSupport.dispose(this.transport);
723
724            factoryStats.removeConnection(this);
725        }
726    }
727
728    /**
729     * Tells the broker to terminate its VM. This can be used to cleanly
730     * terminate a broker running in a standalone java process. Server must have
731     * property enable.vm.shutdown=true defined to allow this to work.
732     */
733    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
734    // implemented.
735    /*
736     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
737     * command = new BrokerAdminCommand();
738     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
739     * asyncSendPacket(command); }
740     */
741
742    /**
743     * Create a durable connection consumer for this connection (optional
744     * operation). This is an expert facility not used by regular JMS clients.
745     *
746     * @param topic topic to access
747     * @param subscriptionName durable subscription name
748     * @param messageSelector only messages with properties matching the message
749     *                selector expression are delivered. A value of null or an
750     *                empty string indicates that there is no message selector
751     *                for the message consumer.
752     * @param sessionPool the server session pool to associate with this durable
753     *                connection consumer
754     * @param maxMessages the maximum number of messages that can be assigned to
755     *                a server session at one time
756     * @return the durable connection consumer
757     * @throws JMSException if the <CODE>Connection</CODE> object fails to
758     *                 create a connection consumer due to some internal error
759     *                 or invalid arguments for <CODE>sessionPool</CODE> and
760     *                 <CODE>messageSelector</CODE>.
761     * @throws javax.jms.InvalidDestinationException if an invalid destination
762     *                 is specified.
763     * @throws javax.jms.InvalidSelectorException if the message selector is
764     *                 invalid.
765     * @see javax.jms.ConnectionConsumer
766     * @since 1.1
767     */
768    @Override
769    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
770        throws JMSException {
771        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
772    }
773
774    /**
775     * Create a durable connection consumer for this connection (optional
776     * operation). This is an expert facility not used by regular JMS clients.
777     *
778     * @param topic topic to access
779     * @param subscriptionName durable subscription name
780     * @param messageSelector only messages with properties matching the message
781     *                selector expression are delivered. A value of null or an
782     *                empty string indicates that there is no message selector
783     *                for the message consumer.
784     * @param sessionPool the server session pool to associate with this durable
785     *                connection consumer
786     * @param maxMessages the maximum number of messages that can be assigned to
787     *                a server session at one time
788     * @param noLocal set true if you want to filter out messages published
789     *                locally
790     * @return the durable connection consumer
791     * @throws JMSException if the <CODE>Connection</CODE> object fails to
792     *                 create a connection consumer due to some internal error
793     *                 or invalid arguments for <CODE>sessionPool</CODE> and
794     *                 <CODE>messageSelector</CODE>.
795     * @throws javax.jms.InvalidDestinationException if an invalid destination
796     *                 is specified.
797     * @throws javax.jms.InvalidSelectorException if the message selector is
798     *                 invalid.
799     * @see javax.jms.ConnectionConsumer
800     * @since 1.1
801     */
802    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
803                                                              boolean noLocal) throws JMSException {
804        checkClosedOrFailed();
805
806        if (queueOnlyConnection) {
807            throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
808        }
809
810        ensureConnectionInfoSent();
811        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
812        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
813        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
814        info.setSubscriptionName(subscriptionName);
815        info.setSelector(messageSelector);
816        info.setPrefetchSize(maxMessages);
817        info.setDispatchAsync(isDispatchAsync());
818
819        // Allows the options on the destination to configure the consumerInfo
820        if (info.getDestination().getOptions() != null) {
821            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
822            IntrospectionSupport.setProperties(this.info, options, "consumer.");
823        }
824
825        return new ActiveMQConnectionConsumer(this, sessionPool, info);
826    }
827
828    // Properties
829    // -------------------------------------------------------------------------
830
831    /**
832     * Returns true if this connection has been started
833     *
834     * @return true if this Connection is started
835     */
836    public boolean isStarted() {
837        return started.get();
838    }
839
840    /**
841     * Returns true if the connection is closed
842     */
843    public boolean isClosed() {
844        return closed.get();
845    }
846
847    /**
848     * Returns true if the connection is in the process of being closed
849     */
850    public boolean isClosing() {
851        return closing.get();
852    }
853
854    /**
855     * Returns true if the underlying transport has failed
856     */
857    public boolean isTransportFailed() {
858        return transportFailed.get();
859    }
860
861    /**
862     * @return Returns the prefetchPolicy.
863     */
864    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
865        return prefetchPolicy;
866    }
867
868    /**
869     * Sets the <a
870     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
871     * policy</a> for consumers created by this connection.
872     */
873    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
874        this.prefetchPolicy = prefetchPolicy;
875    }
876
877    /**
878     */
879    public Transport getTransportChannel() {
880        return transport;
881    }
882
883    /**
884     * @return Returns the clientID of the connection, forcing one to be
885     *         generated if one has not yet been configured.
886     */
887    public String getInitializedClientID() throws JMSException {
888        ensureConnectionInfoSent();
889        return info.getClientId();
890    }
891
892    /**
893     * @return Returns the timeStampsDisableByDefault.
894     */
895    public boolean isDisableTimeStampsByDefault() {
896        return disableTimeStampsByDefault;
897    }
898
899    /**
900     * Sets whether or not timestamps on messages should be disabled or not. If
901     * you disable them it adds a small performance boost.
902     */
903    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
904        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
905    }
906
907    /**
908     * @return Returns the dispatchOptimizedMessage.
909     */
910    public boolean isOptimizedMessageDispatch() {
911        return optimizedMessageDispatch;
912    }
913
914    /**
915     * If this flag is set then an larger prefetch limit is used - only
916     * applicable for durable topic subscribers.
917     */
918    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
919        this.optimizedMessageDispatch = dispatchOptimizedMessage;
920    }
921
922    /**
923     * @return Returns the closeTimeout.
924     */
925    public int getCloseTimeout() {
926        return closeTimeout;
927    }
928
929    /**
930     * Sets the timeout before a close is considered complete. Normally a
931     * close() on a connection waits for confirmation from the broker; this
932     * allows that operation to timeout to save the client hanging if there is
933     * no broker
934     */
935    public void setCloseTimeout(int closeTimeout) {
936        this.closeTimeout = closeTimeout;
937    }
938
939    /**
940     * @return ConnectionInfo
941     */
942    public ConnectionInfo getConnectionInfo() {
943        return this.info;
944    }
945
946    public boolean isUseRetroactiveConsumer() {
947        return useRetroactiveConsumer;
948    }
949
950    /**
951     * Sets whether or not retroactive consumers are enabled. Retroactive
952     * consumers allow non-durable topic subscribers to receive old messages
953     * that were published before the non-durable subscriber started.
954     */
955    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
956        this.useRetroactiveConsumer = useRetroactiveConsumer;
957    }
958
959    public boolean isNestedMapAndListEnabled() {
960        return nestedMapAndListEnabled;
961    }
962
963    /**
964     * Enables/disables whether or not Message properties and MapMessage entries
965     * support <a
966     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
967     * Structures</a> of Map and List objects
968     */
969    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
970        this.nestedMapAndListEnabled = structuredMapsEnabled;
971    }
972
973    public boolean isExclusiveConsumer() {
974        return exclusiveConsumer;
975    }
976
977    /**
978     * Enables or disables whether or not queue consumers should be exclusive or
979     * not for example to preserve ordering when not using <a
980     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
981     *
982     * @param exclusiveConsumer
983     */
984    public void setExclusiveConsumer(boolean exclusiveConsumer) {
985        this.exclusiveConsumer = exclusiveConsumer;
986    }
987
988    /**
989     * Adds a transport listener so that a client can be notified of events in
990     * the underlying transport
991     */
992    public void addTransportListener(TransportListener transportListener) {
993        transportListeners.add(transportListener);
994    }
995
996    public void removeTransportListener(TransportListener transportListener) {
997        transportListeners.remove(transportListener);
998    }
999
1000    public boolean isUseDedicatedTaskRunner() {
1001        return useDedicatedTaskRunner;
1002    }
1003
1004    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1005        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1006    }
1007
1008    public TaskRunnerFactory getSessionTaskRunner() {
1009        synchronized (this) {
1010            if (sessionTaskRunner == null) {
1011                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
1012                sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
1013            }
1014        }
1015        return sessionTaskRunner;
1016    }
1017
1018    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1019        this.sessionTaskRunner = sessionTaskRunner;
1020    }
1021
1022    public MessageTransformer getTransformer() {
1023        return transformer;
1024    }
1025
1026    /**
1027     * Sets the transformer used to transform messages before they are sent on
1028     * to the JMS bus or when they are received from the bus but before they are
1029     * delivered to the JMS client
1030     */
1031    public void setTransformer(MessageTransformer transformer) {
1032        this.transformer = transformer;
1033    }
1034
1035    /**
1036     * @return the statsEnabled
1037     */
1038    public boolean isStatsEnabled() {
1039        return this.stats.isEnabled();
1040    }
1041
1042    /**
1043     * @param statsEnabled the statsEnabled to set
1044     */
1045    public void setStatsEnabled(boolean statsEnabled) {
1046        this.stats.setEnabled(statsEnabled);
1047    }
1048
1049    /**
1050     * Returns the {@link DestinationSource} object which can be used to listen to destinations
1051     * being created or destroyed or to enquire about the current destinations available on the broker
1052     *
1053     * @return a lazily created destination source
1054     * @throws JMSException
1055     */
1056    @Override
1057    public DestinationSource getDestinationSource() throws JMSException {
1058        if (destinationSource == null) {
1059            destinationSource = new DestinationSource(this);
1060            destinationSource.start();
1061        }
1062        return destinationSource;
1063    }
1064
1065    // Implementation methods
1066    // -------------------------------------------------------------------------
1067
1068    /**
1069     * Used internally for adding Sessions to the Connection
1070     *
1071     * @param session
1072     * @throws JMSException
1073     * @throws JMSException
1074     */
1075    protected void addSession(ActiveMQSession session) throws JMSException {
1076        this.sessions.add(session);
1077        if (sessions.size() > 1 || session.isTransacted()) {
1078            optimizedMessageDispatch = false;
1079        }
1080    }
1081
1082    /**
1083     * Used interanlly for removing Sessions from a Connection
1084     *
1085     * @param session
1086     */
1087    protected void removeSession(ActiveMQSession session) {
1088        this.sessions.remove(session);
1089        this.removeDispatcher(session);
1090    }
1091
1092    /**
1093     * Add a ConnectionConsumer
1094     *
1095     * @param connectionConsumer
1096     * @throws JMSException
1097     */
1098    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1099        this.connectionConsumers.add(connectionConsumer);
1100    }
1101
1102    /**
1103     * Remove a ConnectionConsumer
1104     *
1105     * @param connectionConsumer
1106     */
1107    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1108        this.connectionConsumers.remove(connectionConsumer);
1109        this.removeDispatcher(connectionConsumer);
1110    }
1111
1112    /**
1113     * Creates a <CODE>TopicSession</CODE> object.
1114     *
1115     * @param transacted indicates whether the session is transacted
1116     * @param acknowledgeMode indicates whether the consumer or the client will
1117     *                acknowledge any messages it receives; ignored if the
1118     *                session is transacted. Legal values are
1119     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1120     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1121     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1122     * @return a newly created topic session
1123     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1124     *                 to create a session due to some internal error or lack of
1125     *                 support for the specific transaction and acknowledgement
1126     *                 mode.
1127     * @see Session#AUTO_ACKNOWLEDGE
1128     * @see Session#CLIENT_ACKNOWLEDGE
1129     * @see Session#DUPS_OK_ACKNOWLEDGE
1130     */
1131    @Override
1132    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1133        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1134    }
1135
1136    /**
1137     * Creates a connection consumer for this connection (optional operation).
1138     * This is an expert facility not used by regular JMS clients.
1139     *
1140     * @param topic the topic to access
1141     * @param messageSelector only messages with properties matching the message
1142     *                selector expression are delivered. A value of null or an
1143     *                empty string indicates that there is no message selector
1144     *                for the message consumer.
1145     * @param sessionPool the server session pool to associate with this
1146     *                connection consumer
1147     * @param maxMessages the maximum number of messages that can be assigned to
1148     *                a server session at one time
1149     * @return the connection consumer
1150     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1151     *                 to create a connection consumer due to some internal
1152     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1153     *                 and <CODE>messageSelector</CODE>.
1154     * @throws javax.jms.InvalidDestinationException if an invalid topic is
1155     *                 specified.
1156     * @throws javax.jms.InvalidSelectorException if the message selector is
1157     *                 invalid.
1158     * @see javax.jms.ConnectionConsumer
1159     */
1160    @Override
1161    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1162        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1163    }
1164
1165    /**
1166     * Creates a connection consumer for this connection (optional operation).
1167     * This is an expert facility not used by regular JMS clients.
1168     *
1169     * @param queue the queue to access
1170     * @param messageSelector only messages with properties matching the message
1171     *                selector expression are delivered. A value of null or an
1172     *                empty string indicates that there is no message selector
1173     *                for the message consumer.
1174     * @param sessionPool the server session pool to associate with this
1175     *                connection consumer
1176     * @param maxMessages the maximum number of messages that can be assigned to
1177     *                a server session at one time
1178     * @return the connection consumer
1179     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1180     *                 to create a connection consumer due to some internal
1181     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1182     *                 and <CODE>messageSelector</CODE>.
1183     * @throws javax.jms.InvalidDestinationException if an invalid queue is
1184     *                 specified.
1185     * @throws javax.jms.InvalidSelectorException if the message selector is
1186     *                 invalid.
1187     * @see javax.jms.ConnectionConsumer
1188     */
1189    @Override
1190    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1191        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1192    }
1193
1194    /**
1195     * Creates a connection consumer for this connection (optional operation).
1196     * This is an expert facility not used by regular JMS clients.
1197     *
1198     * @param destination the destination to access
1199     * @param messageSelector only messages with properties matching the message
1200     *                selector expression are delivered. A value of null or an
1201     *                empty string indicates that there is no message selector
1202     *                for the message consumer.
1203     * @param sessionPool the server session pool to associate with this
1204     *                connection consumer
1205     * @param maxMessages the maximum number of messages that can be assigned to
1206     *                a server session at one time
1207     * @return the connection consumer
1208     * @throws JMSException if the <CODE>Connection</CODE> object fails to
1209     *                 create a connection consumer due to some internal error
1210     *                 or invalid arguments for <CODE>sessionPool</CODE> and
1211     *                 <CODE>messageSelector</CODE>.
1212     * @throws javax.jms.InvalidDestinationException if an invalid destination
1213     *                 is specified.
1214     * @throws javax.jms.InvalidSelectorException if the message selector is
1215     *                 invalid.
1216     * @see javax.jms.ConnectionConsumer
1217     * @since 1.1
1218     */
1219    @Override
1220    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1221        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1222    }
1223
1224    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1225        throws JMSException {
1226
1227        checkClosedOrFailed();
1228        ensureConnectionInfoSent();
1229
1230        ConsumerId consumerId = createConsumerId();
1231        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1232        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1233        consumerInfo.setSelector(messageSelector);
1234        consumerInfo.setPrefetchSize(maxMessages);
1235        consumerInfo.setNoLocal(noLocal);
1236        consumerInfo.setDispatchAsync(isDispatchAsync());
1237
1238        // Allows the options on the destination to configure the consumerInfo
1239        if (consumerInfo.getDestination().getOptions() != null) {
1240            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1241            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1242        }
1243
1244        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1245    }
1246
1247    /**
1248     * @return a newly created ConsumedId unique to this connection session instance.
1249     */
1250    private ConsumerId createConsumerId() {
1251        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1252    }
1253
1254    /**
1255     * Creates a <CODE>QueueSession</CODE> object.
1256     *
1257     * @param transacted indicates whether the session is transacted
1258     * @param acknowledgeMode indicates whether the consumer or the client will
1259     *                acknowledge any messages it receives; ignored if the
1260     *                session is transacted. Legal values are
1261     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1262     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1263     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1264     * @return a newly created queue session
1265     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1266     *                 to create a session due to some internal error or lack of
1267     *                 support for the specific transaction and acknowledgement
1268     *                 mode.
1269     * @see Session#AUTO_ACKNOWLEDGE
1270     * @see Session#CLIENT_ACKNOWLEDGE
1271     * @see Session#DUPS_OK_ACKNOWLEDGE
1272     */
1273    @Override
1274    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1275        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1276    }
1277
1278    /**
1279     * Ensures that the clientID was manually specified and not auto-generated.
1280     * If the clientID was not specified this method will throw an exception.
1281     * This method is used to ensure that the clientID + durableSubscriber name
1282     * are used correctly.
1283     *
1284     * @throws JMSException
1285     */
1286    public void checkClientIDWasManuallySpecified() throws JMSException {
1287        if (!userSpecifiedClientID) {
1288            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1289        }
1290    }
1291
1292    /**
1293     * send a Packet through the Connection - for internal use only
1294     *
1295     * @param command
1296     * @throws JMSException
1297     */
1298    public void asyncSendPacket(Command command) throws JMSException {
1299        if (isClosed()) {
1300            throw new ConnectionClosedException();
1301        } else {
1302            doAsyncSendPacket(command);
1303        }
1304    }
1305
1306    private void doAsyncSendPacket(Command command) throws JMSException {
1307        try {
1308            this.transport.oneway(command);
1309        } catch (IOException e) {
1310            throw JMSExceptionSupport.create(e);
1311        }
1312    }
1313
1314    /**
1315     * Send a packet through a Connection - for internal use only
1316     *
1317     * @param command
1318     *
1319     * @throws JMSException
1320     */
1321    public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1322        if(onComplete==null) {
1323            syncSendPacket(command);
1324        } else {
1325            if (isClosed()) {
1326                throw new ConnectionClosedException();
1327            }
1328            try {
1329                this.transport.asyncRequest(command, new ResponseCallback() {
1330                    @Override
1331                    public void onCompletion(FutureResponse resp) {
1332                        Response response;
1333                        Throwable exception = null;
1334                        try {
1335                            response = resp.getResult();
1336                            if (response.isException()) {
1337                                ExceptionResponse er = (ExceptionResponse)response;
1338                                exception = er.getException();
1339                            }
1340                        } catch (Exception e) {
1341                            exception = e;
1342                        }
1343                        if(exception!=null) {
1344                            if ( exception instanceof JMSException) {
1345                                onComplete.onException((JMSException) exception);
1346                            } else {
1347                                if (isClosed()||closing.get()) {
1348                                    LOG.debug("Received an exception but connection is closing");
1349                                }
1350                                JMSException jmsEx = null;
1351                                try {
1352                                    jmsEx = JMSExceptionSupport.create(exception);
1353                                } catch(Throwable e) {
1354                                    LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1355                                }
1356                                // dispose of transport for security exceptions on connection initiation
1357                                if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1358                                    forceCloseOnSecurityException(exception);
1359                                }
1360                                if (jmsEx !=null) {
1361                                    onComplete.onException(jmsEx);
1362                                }
1363                            }
1364                        } else {
1365                            onComplete.onSuccess();
1366                        }
1367                    }
1368                });
1369            } catch (IOException e) {
1370                throw JMSExceptionSupport.create(e);
1371            }
1372        }
1373    }
1374
1375    private void forceCloseOnSecurityException(Throwable exception) {
1376        LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception);
1377        onException(new IOException("Force close due to SecurityException on connect", exception));
1378    }
1379
1380    public Response syncSendPacket(Command command, int timeout) throws JMSException {
1381        if (isClosed()) {
1382            throw new ConnectionClosedException();
1383        } else {
1384
1385            try {
1386                Response response = (Response)(timeout > 0
1387                        ? this.transport.request(command, timeout)
1388                        : this.transport.request(command));
1389                if (response.isException()) {
1390                    ExceptionResponse er = (ExceptionResponse)response;
1391                    if (er.getException() instanceof JMSException) {
1392                        throw (JMSException)er.getException();
1393                    } else {
1394                        if (isClosed()||closing.get()) {
1395                            LOG.debug("Received an exception but connection is closing");
1396                        }
1397                        JMSException jmsEx = null;
1398                        try {
1399                            jmsEx = JMSExceptionSupport.create(er.getException());
1400                        } catch(Throwable e) {
1401                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1402                        }
1403                        if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1404                            forceCloseOnSecurityException(er.getException());
1405                        }
1406                        if (jmsEx !=null) {
1407                            throw jmsEx;
1408                        }
1409                    }
1410                }
1411                return response;
1412            } catch (IOException e) {
1413                throw JMSExceptionSupport.create(e);
1414            }
1415        }
1416    }
1417
1418    /**
1419     * Send a packet through a Connection - for internal use only
1420     *
1421     * @param command
1422     *
1423     * @return the broker Response for the given Command.
1424     *
1425     * @throws JMSException
1426     */
1427    public Response syncSendPacket(Command command) throws JMSException {
1428        return syncSendPacket(command, 0);
1429    }
1430
1431    /**
1432     * @return statistics for this Connection
1433     */
1434    @Override
1435    public StatsImpl getStats() {
1436        return stats;
1437    }
1438
1439    /**
1440     * simply throws an exception if the Connection is already closed or the
1441     * Transport has failed
1442     *
1443     * @throws JMSException
1444     */
1445    protected synchronized void checkClosedOrFailed() throws JMSException {
1446        checkClosed();
1447        if (transportFailed.get()) {
1448            throw new ConnectionFailedException(firstFailureError);
1449        }
1450    }
1451
1452    /**
1453     * simply throws an exception if the Connection is already closed
1454     *
1455     * @throws JMSException
1456     */
1457    protected synchronized void checkClosed() throws JMSException {
1458        if (closed.get()) {
1459            throw new ConnectionClosedException();
1460        }
1461    }
1462
1463    /**
1464     * Send the ConnectionInfo to the Broker
1465     *
1466     * @throws JMSException
1467     */
1468    protected void ensureConnectionInfoSent() throws JMSException {
1469        synchronized(this.ensureConnectionInfoSentMutex) {
1470            // Can we skip sending the ConnectionInfo packet??
1471            if (isConnectionInfoSentToBroker || closed.get()) {
1472                return;
1473            }
1474            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1475            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1476                info.setClientId(clientIdGenerator.generateId());
1477            }
1478            syncSendPacket(info.copy(), getConnectResponseTimeout());
1479
1480            this.isConnectionInfoSentToBroker = true;
1481            // Add a temp destination advisory consumer so that
1482            // We know what the valid temporary destinations are on the
1483            // broker without having to do an RPC to the broker.
1484
1485            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1486            if (watchTopicAdvisories) {
1487                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1488            }
1489        }
1490    }
1491
1492    public synchronized boolean isWatchTopicAdvisories() {
1493        return watchTopicAdvisories;
1494    }
1495
1496    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1497        this.watchTopicAdvisories = watchTopicAdvisories;
1498    }
1499
1500    /**
1501     * @return Returns the useAsyncSend.
1502     */
1503    public boolean isUseAsyncSend() {
1504        return useAsyncSend;
1505    }
1506
1507    /**
1508     * Forces the use of <a
1509     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1510     * adds a massive performance boost; but means that the send() method will
1511     * return immediately whether the message has been sent or not which could
1512     * lead to message loss.
1513     */
1514    public void setUseAsyncSend(boolean useAsyncSend) {
1515        this.useAsyncSend = useAsyncSend;
1516    }
1517
1518    /**
1519     * @return true if always sync send messages
1520     */
1521    public boolean isAlwaysSyncSend() {
1522        return this.alwaysSyncSend;
1523    }
1524
1525    /**
1526     * Set true if always require messages to be sync sent
1527     *
1528     * @param alwaysSyncSend
1529     */
1530    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1531        this.alwaysSyncSend = alwaysSyncSend;
1532    }
1533
1534    /**
1535     * @return the messagePrioritySupported
1536     */
1537    public boolean isMessagePrioritySupported() {
1538        return this.messagePrioritySupported;
1539    }
1540
1541    /**
1542     * @param messagePrioritySupported the messagePrioritySupported to set
1543     */
1544    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1545        this.messagePrioritySupported = messagePrioritySupported;
1546    }
1547
1548    /**
1549     * Cleans up this connection so that it's state is as if the connection was
1550     * just created. This allows the Resource Adapter to clean up a connection
1551     * so that it can be reused without having to close and recreate the
1552     * connection.
1553     */
1554    public void cleanup() throws JMSException {
1555        doCleanup(false);
1556    }
1557
1558    public void doCleanup(boolean removeConnection) throws JMSException {
1559        if (advisoryConsumer != null && !isTransportFailed()) {
1560            advisoryConsumer.dispose();
1561            advisoryConsumer = null;
1562        }
1563
1564        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1565            ActiveMQSession s = i.next();
1566            s.dispose();
1567        }
1568        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1569            ActiveMQConnectionConsumer c = i.next();
1570            c.dispose();
1571        }
1572
1573        if (removeConnection) {
1574            if (isConnectionInfoSentToBroker) {
1575                if (!transportFailed.get() && !closing.get()) {
1576                    syncSendPacket(info.createRemoveCommand());
1577                }
1578                isConnectionInfoSentToBroker = false;
1579            }
1580            if (userSpecifiedClientID) {
1581                info.setClientId(null);
1582                userSpecifiedClientID = false;
1583            }
1584            clientIDSet = false;
1585        }
1586
1587        started.set(false);
1588    }
1589
1590    /**
1591     * Changes the associated username/password that is associated with this
1592     * connection. If the connection has been used, you must called cleanup()
1593     * before calling this method.
1594     *
1595     * @throws IllegalStateException if the connection is in used.
1596     */
1597    public void changeUserInfo(String userName, String password) throws JMSException {
1598        if (isConnectionInfoSentToBroker) {
1599            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1600        }
1601        this.info.setUserName(userName);
1602        this.info.setPassword(password);
1603    }
1604
1605    /**
1606     * @return Returns the resourceManagerId.
1607     * @throws JMSException
1608     */
1609    public String getResourceManagerId() throws JMSException {
1610        if (isRmIdFromConnectionId()) {
1611            return info.getConnectionId().getValue();
1612        }
1613        waitForBrokerInfo();
1614        if (brokerInfo == null) {
1615            throw new JMSException("Connection failed before Broker info was received.");
1616        }
1617        return brokerInfo.getBrokerId().getValue();
1618    }
1619
1620    /**
1621     * Returns the broker name if one is available or null if one is not
1622     * available yet.
1623     */
1624    public String getBrokerName() {
1625        try {
1626            brokerInfoReceived.await(5, TimeUnit.SECONDS);
1627            if (brokerInfo == null) {
1628                return null;
1629            }
1630            return brokerInfo.getBrokerName();
1631        } catch (InterruptedException e) {
1632            Thread.currentThread().interrupt();
1633            return null;
1634        }
1635    }
1636
1637    /**
1638     * Returns the broker information if it is available or null if it is not
1639     * available yet.
1640     */
1641    public BrokerInfo getBrokerInfo() {
1642        return brokerInfo;
1643    }
1644
1645    /**
1646     * @return Returns the RedeliveryPolicy.
1647     * @throws JMSException
1648     */
1649    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1650        return redeliveryPolicyMap.getDefaultEntry();
1651    }
1652
1653    /**
1654     * Sets the redelivery policy to be used when messages are rolled back
1655     */
1656    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1657        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1658    }
1659
1660    public BlobTransferPolicy getBlobTransferPolicy() {
1661        if (blobTransferPolicy == null) {
1662            blobTransferPolicy = createBlobTransferPolicy();
1663        }
1664        return blobTransferPolicy;
1665    }
1666
1667    /**
1668     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1669     * OBjects) are transferred from producers to brokers to consumers
1670     */
1671    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1672        this.blobTransferPolicy = blobTransferPolicy;
1673    }
1674
1675    /**
1676     * @return Returns the alwaysSessionAsync.
1677     */
1678    public boolean isAlwaysSessionAsync() {
1679        return alwaysSessionAsync;
1680    }
1681
1682    /**
1683     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1684     * the Connection. However, a separate thread is always used if there is more than one session, or the session
1685     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
1686     * happens asynchronously.
1687     */
1688    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1689        this.alwaysSessionAsync = alwaysSessionAsync;
1690    }
1691
1692    /**
1693     * @return Returns the optimizeAcknowledge.
1694     */
1695    public boolean isOptimizeAcknowledge() {
1696        return optimizeAcknowledge;
1697    }
1698
1699    /**
1700     * Enables an optimised acknowledgement mode where messages are acknowledged
1701     * in batches rather than individually
1702     *
1703     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1704     */
1705    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1706        this.optimizeAcknowledge = optimizeAcknowledge;
1707    }
1708
1709    /**
1710     * The max time in milliseconds between optimized ack batches
1711     * @param optimizeAcknowledgeTimeOut
1712     */
1713    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1714        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1715    }
1716
1717    public long getOptimizeAcknowledgeTimeOut() {
1718        return optimizeAcknowledgeTimeOut;
1719    }
1720
1721    public long getWarnAboutUnstartedConnectionTimeout() {
1722        return warnAboutUnstartedConnectionTimeout;
1723    }
1724
1725    /**
1726     * Enables the timeout from a connection creation to when a warning is
1727     * generated if the connection is not properly started via {@link #start()}
1728     * and a message is received by a consumer. It is a very common gotcha to
1729     * forget to <a
1730     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1731     * the connection</a> so this option makes the default case to create a
1732     * warning if the user forgets. To disable the warning just set the value to <
1733     * 0 (say -1).
1734     */
1735    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1736        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1737    }
1738
1739    /**
1740     * @return the sendTimeout (in milliseconds)
1741     */
1742    public int getSendTimeout() {
1743        return sendTimeout;
1744    }
1745
1746    /**
1747     * @param sendTimeout the sendTimeout to set (in milliseconds)
1748     */
1749    public void setSendTimeout(int sendTimeout) {
1750        this.sendTimeout = sendTimeout;
1751    }
1752
1753    /**
1754     * @return the sendAcksAsync
1755     */
1756    public boolean isSendAcksAsync() {
1757        return sendAcksAsync;
1758    }
1759
1760    /**
1761     * @param sendAcksAsync the sendAcksAsync to set
1762     */
1763    public void setSendAcksAsync(boolean sendAcksAsync) {
1764        this.sendAcksAsync = sendAcksAsync;
1765    }
1766
1767    /**
1768     * Returns the time this connection was created
1769     */
1770    public long getTimeCreated() {
1771        return timeCreated;
1772    }
1773
1774    private void waitForBrokerInfo() throws JMSException {
1775        try {
1776            brokerInfoReceived.await();
1777        } catch (InterruptedException e) {
1778            Thread.currentThread().interrupt();
1779            throw JMSExceptionSupport.create(e);
1780        }
1781    }
1782
1783    // Package protected so that it can be used in unit tests
1784    public Transport getTransport() {
1785        return transport;
1786    }
1787
1788    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1789        producers.put(producerId, producer);
1790    }
1791
1792    public void removeProducer(ProducerId producerId) {
1793        producers.remove(producerId);
1794    }
1795
1796    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1797        dispatchers.put(consumerId, dispatcher);
1798    }
1799
1800    public void removeDispatcher(ConsumerId consumerId) {
1801        dispatchers.remove(consumerId);
1802    }
1803
1804    public boolean hasDispatcher(ConsumerId consumerId) {
1805        return dispatchers.containsKey(consumerId);
1806    }
1807
1808    /**
1809     * @param o - the command to consume
1810     */
1811    @Override
1812    public void onCommand(final Object o) {
1813        final Command command = (Command)o;
1814        if (!closed.get() && command != null) {
1815            try {
1816                command.visit(new CommandVisitorAdapter() {
1817                    @Override
1818                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
1819                        waitForTransportInterruptionProcessingToComplete();
1820                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1821                        if (dispatcher != null) {
1822                            // Copy in case a embedded broker is dispatching via
1823                            // vm://
1824                            // md.getMessage() == null to signal end of queue
1825                            // browse.
1826                            Message msg = md.getMessage();
1827                            if (msg != null) {
1828                                msg = msg.copy();
1829                                msg.setReadOnlyBody(true);
1830                                msg.setReadOnlyProperties(true);
1831                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1832                                msg.setConnection(ActiveMQConnection.this);
1833                                msg.setMemoryUsage(null);
1834                                md.setMessage(msg);
1835                            }
1836                            dispatcher.dispatch(md);
1837                        } else {
1838                            LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
1839                        }
1840                        return null;
1841                    }
1842
1843                    @Override
1844                    public Response processProducerAck(ProducerAck pa) throws Exception {
1845                        if (pa != null && pa.getProducerId() != null) {
1846                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1847                            if (producer != null) {
1848                                producer.onProducerAck(pa);
1849                            }
1850                        }
1851                        return null;
1852                    }
1853
1854                    @Override
1855                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
1856                        brokerInfo = info;
1857                        brokerInfoReceived.countDown();
1858                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1859                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1860                        return null;
1861                    }
1862
1863                    @Override
1864                    public Response processConnectionError(final ConnectionError error) throws Exception {
1865                        executor.execute(new Runnable() {
1866                            @Override
1867                            public void run() {
1868                                onAsyncException(error.getException());
1869                            }
1870                        });
1871                        return null;
1872                    }
1873
1874                    @Override
1875                    public Response processControlCommand(ControlCommand command) throws Exception {
1876                        onControlCommand(command);
1877                        return null;
1878                    }
1879
1880                    @Override
1881                    public Response processConnectionControl(ConnectionControl control) throws Exception {
1882                        onConnectionControl((ConnectionControl)command);
1883                        return null;
1884                    }
1885
1886                    @Override
1887                    public Response processConsumerControl(ConsumerControl control) throws Exception {
1888                        onConsumerControl((ConsumerControl)command);
1889                        return null;
1890                    }
1891
1892                    @Override
1893                    public Response processWireFormat(WireFormatInfo info) throws Exception {
1894                        onWireFormatInfo((WireFormatInfo)command);
1895                        return null;
1896                    }
1897                });
1898            } catch (Exception e) {
1899                onClientInternalException(e);
1900            }
1901        }
1902
1903        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1904            TransportListener listener = iter.next();
1905            listener.onCommand(command);
1906        }
1907    }
1908
1909    protected void onWireFormatInfo(WireFormatInfo info) {
1910        protocolVersion.set(info.getVersion());
1911    }
1912
1913    /**
1914     * Handles async client internal exceptions.
1915     * A client internal exception is usually one that has been thrown
1916     * by a container runtime component during asynchronous processing of a
1917     * message that does not affect the connection itself.
1918     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1919     * its <code>onException</code> method, if one has been registered with this connection.
1920     *
1921     * @param error the exception that the problem
1922     */
1923    public void onClientInternalException(final Throwable error) {
1924        if ( !closed.get() && !closing.get() ) {
1925            if ( this.clientInternalExceptionListener != null ) {
1926                executor.execute(new Runnable() {
1927                    @Override
1928                    public void run() {
1929                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1930                    }
1931                });
1932            } else {
1933                LOG.debug("Async client internal exception occurred with no exception listener registered: "
1934                        + error, error);
1935            }
1936        }
1937    }
1938
1939    /**
1940     * Used for handling async exceptions
1941     *
1942     * @param error
1943     */
1944    public void onAsyncException(Throwable error) {
1945        if (!closed.get() && !closing.get()) {
1946            if (this.exceptionListener != null) {
1947
1948                if (!(error instanceof JMSException)) {
1949                    error = JMSExceptionSupport.create(error);
1950                }
1951                final JMSException e = (JMSException)error;
1952
1953                executor.execute(new Runnable() {
1954                    @Override
1955                    public void run() {
1956                        ActiveMQConnection.this.exceptionListener.onException(e);
1957                    }
1958                });
1959
1960            } else {
1961                LOG.debug("Async exception with no exception listener: " + error, error);
1962            }
1963        }
1964    }
1965
1966    @Override
1967    public void onException(final IOException error) {
1968        onAsyncException(error);
1969        if (!closing.get() && !closed.get()) {
1970            executor.execute(new Runnable() {
1971                @Override
1972                public void run() {
1973                    transportFailed(error);
1974                    ServiceSupport.dispose(ActiveMQConnection.this.transport);
1975                    brokerInfoReceived.countDown();
1976                    try {
1977                        doCleanup(true);
1978                    } catch (JMSException e) {
1979                        LOG.warn("Exception during connection cleanup, " + e, e);
1980                    }
1981                    for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1982                        TransportListener listener = iter.next();
1983                        listener.onException(error);
1984                    }
1985                }
1986            });
1987        }
1988    }
1989
1990    @Override
1991    public void transportInterupted() {
1992        transportInterruptionProcessingComplete.set(1);
1993        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1994            ActiveMQSession s = i.next();
1995            s.clearMessagesInProgress(transportInterruptionProcessingComplete);
1996        }
1997
1998        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1999            connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
2000        }
2001
2002        if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
2003            if (LOG.isDebugEnabled()) {
2004                LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get());
2005            }
2006            signalInterruptionProcessingNeeded();
2007        }
2008
2009        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2010            TransportListener listener = iter.next();
2011            listener.transportInterupted();
2012        }
2013    }
2014
2015    @Override
2016    public void transportResumed() {
2017        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2018            TransportListener listener = iter.next();
2019            listener.transportResumed();
2020        }
2021    }
2022
2023    /**
2024     * Create the DestinationInfo object for the temporary destination.
2025     *
2026     * @param topic - if its true topic, else queue.
2027     * @return DestinationInfo
2028     * @throws JMSException
2029     */
2030    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2031
2032        // Check if Destination info is of temporary type.
2033        ActiveMQTempDestination dest;
2034        if (topic) {
2035            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2036        } else {
2037            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2038        }
2039
2040        DestinationInfo info = new DestinationInfo();
2041        info.setConnectionId(this.info.getConnectionId());
2042        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2043        info.setDestination(dest);
2044        syncSendPacket(info);
2045
2046        dest.setConnection(this);
2047        activeTempDestinations.put(dest, dest);
2048        return dest;
2049    }
2050
2051    /**
2052     * @param destination
2053     * @throws JMSException
2054     */
2055    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2056
2057        checkClosedOrFailed();
2058
2059        for (ActiveMQSession session : this.sessions) {
2060            if (session.isInUse(destination)) {
2061                throw new JMSException("A consumer is consuming from the temporary destination");
2062            }
2063        }
2064
2065        activeTempDestinations.remove(destination);
2066
2067        DestinationInfo destInfo = new DestinationInfo();
2068        destInfo.setConnectionId(this.info.getConnectionId());
2069        destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2070        destInfo.setDestination(destination);
2071        destInfo.setTimeout(0);
2072        syncSendPacket(destInfo);
2073    }
2074
2075    public boolean isDeleted(ActiveMQDestination dest) {
2076
2077        // If we are not watching the advisories.. then
2078        // we will assume that the temp destination does exist.
2079        if (advisoryConsumer == null) {
2080            return false;
2081        }
2082
2083        return !activeTempDestinations.containsValue(dest);
2084    }
2085
2086    public boolean isCopyMessageOnSend() {
2087        return copyMessageOnSend;
2088    }
2089
2090    public LongSequenceGenerator getLocalTransactionIdGenerator() {
2091        return localTransactionIdGenerator;
2092    }
2093
2094    public boolean isUseCompression() {
2095        return useCompression;
2096    }
2097
2098    /**
2099     * Enables the use of compression of the message bodies
2100     */
2101    public void setUseCompression(boolean useCompression) {
2102        this.useCompression = useCompression;
2103    }
2104
2105    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2106
2107        checkClosedOrFailed();
2108        ensureConnectionInfoSent();
2109
2110        DestinationInfo info = new DestinationInfo();
2111        info.setConnectionId(this.info.getConnectionId());
2112        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2113        info.setDestination(destination);
2114        info.setTimeout(0);
2115        syncSendPacket(info);
2116    }
2117
2118    public boolean isDispatchAsync() {
2119        return dispatchAsync;
2120    }
2121
2122    /**
2123     * Enables or disables the default setting of whether or not consumers have
2124     * their messages <a
2125     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2126     * synchronously or asynchronously by the broker</a>. For non-durable
2127     * topics for example we typically dispatch synchronously by default to
2128     * minimize context switches which boost performance. However sometimes its
2129     * better to go slower to ensure that a single blocked consumer socket does
2130     * not block delivery to other consumers.
2131     *
2132     * @param asyncDispatch If true then consumers created on this connection
2133     *                will default to having their messages dispatched
2134     *                asynchronously. The default value is true.
2135     */
2136    public void setDispatchAsync(boolean asyncDispatch) {
2137        this.dispatchAsync = asyncDispatch;
2138    }
2139
2140    public boolean isObjectMessageSerializationDefered() {
2141        return objectMessageSerializationDefered;
2142    }
2143
2144    /**
2145     * When an object is set on an ObjectMessage, the JMS spec requires the
2146     * object to be serialized by that set method. Enabling this flag causes the
2147     * object to not get serialized. The object may subsequently get serialized
2148     * if the message needs to be sent over a socket or stored to disk.
2149     */
2150    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2151        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2152    }
2153
2154    /**
2155     * Unsubscribes a durable subscription that has been created by a client.
2156     * <P>
2157     * This method deletes the state being maintained on behalf of the
2158     * subscriber by its provider.
2159     * <P>
2160     * It is erroneous for a client to delete a durable subscription while there
2161     * is an active <CODE>MessageConsumer </CODE> or
2162     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2163     * message is part of a pending transaction or has not been acknowledged in
2164     * the session.
2165     *
2166     * @param name the name used to identify this subscription
2167     * @throws JMSException if the session fails to unsubscribe to the durable
2168     *                 subscription due to some internal error.
2169     * @throws InvalidDestinationException if an invalid subscription name is
2170     *                 specified.
2171     * @since 1.1
2172     */
2173    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2174        checkClosedOrFailed();
2175        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2176        rsi.setConnectionId(getConnectionInfo().getConnectionId());
2177        rsi.setSubscriptionName(name);
2178        rsi.setClientId(getConnectionInfo().getClientId());
2179        syncSendPacket(rsi);
2180    }
2181
2182    /**
2183     * Internal send method optimized: - It does not copy the message - It can
2184     * only handle ActiveMQ messages. - You can specify if the send is async or
2185     * sync - Does not allow you to send /w a transaction.
2186     */
2187    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2188        checkClosedOrFailed();
2189
2190        if (destination.isTemporary() && isDeleted(destination)) {
2191            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2192        }
2193
2194        msg.setJMSDestination(destination);
2195        msg.setJMSDeliveryMode(deliveryMode);
2196        long expiration = 0L;
2197
2198        if (!isDisableTimeStampsByDefault()) {
2199            long timeStamp = System.currentTimeMillis();
2200            msg.setJMSTimestamp(timeStamp);
2201            if (timeToLive > 0) {
2202                expiration = timeToLive + timeStamp;
2203            }
2204        }
2205
2206        msg.setJMSExpiration(expiration);
2207        msg.setJMSPriority(priority);
2208        msg.setJMSRedelivered(false);
2209        msg.setMessageId(messageId);
2210        msg.onSend();
2211        msg.setProducerId(msg.getMessageId().getProducerId());
2212
2213        if (LOG.isDebugEnabled()) {
2214            LOG.debug("Sending message: " + msg);
2215        }
2216
2217        if (async) {
2218            asyncSendPacket(msg);
2219        } else {
2220            syncSendPacket(msg);
2221        }
2222    }
2223
2224    protected void onControlCommand(ControlCommand command) {
2225        String text = command.getCommand();
2226        if (text != null) {
2227            if ("shutdown".equals(text)) {
2228                LOG.info("JVM told to shutdown");
2229                System.exit(0);
2230            }
2231
2232            // TODO Should we handle the "close" case?
2233            // if (false && "close".equals(text)){
2234            //     LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2235            //     try {
2236            //         close();
2237            //     } catch (JMSException e) {
2238            //     }
2239            // }
2240        }
2241    }
2242
2243    protected void onConnectionControl(ConnectionControl command) {
2244        if (command.isFaultTolerant()) {
2245            this.optimizeAcknowledge = false;
2246            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2247                ActiveMQSession s = i.next();
2248                s.setOptimizeAcknowledge(false);
2249            }
2250        }
2251    }
2252
2253    protected void onConsumerControl(ConsumerControl command) {
2254        if (command.isClose()) {
2255            for (ActiveMQSession session : this.sessions) {
2256                session.close(command.getConsumerId());
2257            }
2258        } else {
2259            for (ActiveMQSession session : this.sessions) {
2260                session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2261            }
2262            for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
2263                ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
2264                if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
2265                    consumerInfo.setPrefetchSize(command.getPrefetch());
2266                }
2267            }
2268        }
2269    }
2270
2271    protected void transportFailed(IOException error) {
2272        transportFailed.set(true);
2273        if (firstFailureError == null) {
2274            firstFailureError = error;
2275        }
2276    }
2277
2278    /**
2279     * Should a JMS message be copied to a new JMS Message object as part of the
2280     * send() method in JMS. This is enabled by default to be compliant with the
2281     * JMS specification. You can disable it if you do not mutate JMS messages
2282     * after they are sent for a performance boost
2283     */
2284    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2285        this.copyMessageOnSend = copyMessageOnSend;
2286    }
2287
2288    @Override
2289    public String toString() {
2290        return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2291    }
2292
2293    protected BlobTransferPolicy createBlobTransferPolicy() {
2294        return new BlobTransferPolicy();
2295    }
2296
2297    public int getProtocolVersion() {
2298        return protocolVersion.get();
2299    }
2300
2301    public int getProducerWindowSize() {
2302        return producerWindowSize;
2303    }
2304
2305    public void setProducerWindowSize(int producerWindowSize) {
2306        this.producerWindowSize = producerWindowSize;
2307    }
2308
2309    public void setAuditDepth(int auditDepth) {
2310        connectionAudit.setAuditDepth(auditDepth);
2311    }
2312
2313    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2314        connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2315    }
2316
2317    protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2318        connectionAudit.removeDispatcher(dispatcher);
2319    }
2320
2321    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2322        return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2323    }
2324
2325    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2326        connectionAudit.rollbackDuplicate(dispatcher, message);
2327    }
2328
2329    public IOException getFirstFailureError() {
2330        return firstFailureError;
2331    }
2332
2333    protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2334        if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) {
2335            LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get());
2336            signalInterruptionProcessingComplete();
2337        }
2338    }
2339
2340    protected void transportInterruptionProcessingComplete() {
2341        if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
2342            signalInterruptionProcessingComplete();
2343        }
2344    }
2345
2346    private void signalInterruptionProcessingComplete() {
2347            if (LOG.isDebugEnabled()) {
2348                LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
2349                        + " for:" + this.getConnectionInfo().getConnectionId());
2350            }
2351
2352            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2353            if (failoverTransport != null) {
2354                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2355                if (LOG.isDebugEnabled()) {
2356                    LOG.debug("notified failover transport (" + failoverTransport
2357                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2358                }
2359            }
2360            transportInterruptionProcessingComplete.set(0);
2361    }
2362
2363    private void signalInterruptionProcessingNeeded() {
2364        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2365        if (failoverTransport != null) {
2366            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2367            if (LOG.isDebugEnabled()) {
2368                LOG.debug("notified failover transport (" + failoverTransport
2369                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2370            }
2371        }
2372    }
2373
2374    /*
2375     * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2376     * will wait to receive re dispatched messages.
2377     * default value is 0 so there is no wait by default.
2378     */
2379    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2380        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2381    }
2382
2383    public long getConsumerFailoverRedeliveryWaitPeriod() {
2384        return consumerFailoverRedeliveryWaitPeriod;
2385    }
2386
2387    protected Scheduler getScheduler() throws JMSException {
2388        Scheduler result = scheduler;
2389        if (result == null) {
2390            if (isClosing() || isClosed()) {
2391                // without lock contention report the closing state
2392                throw new ConnectionClosedException();
2393            }
2394            synchronized (this) {
2395                result = scheduler;
2396                if (result == null) {
2397                    checkClosed();
2398                    try {
2399                        result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2400                        result.start();
2401                        scheduler = result;
2402                    } catch(Exception e) {
2403                        throw JMSExceptionSupport.create(e);
2404                    }
2405                }
2406            }
2407        }
2408        return result;
2409    }
2410
2411    protected ThreadPoolExecutor getExecutor() {
2412        return this.executor;
2413    }
2414
2415    protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
2416        return sessions;
2417    }
2418
2419    /**
2420     * @return the checkForDuplicates
2421     */
2422    public boolean isCheckForDuplicates() {
2423        return this.checkForDuplicates;
2424    }
2425
2426    /**
2427     * @param checkForDuplicates the checkForDuplicates to set
2428     */
2429    public void setCheckForDuplicates(boolean checkForDuplicates) {
2430        this.checkForDuplicates = checkForDuplicates;
2431    }
2432
2433    public boolean isTransactedIndividualAck() {
2434        return transactedIndividualAck;
2435    }
2436
2437    public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2438        this.transactedIndividualAck = transactedIndividualAck;
2439    }
2440
2441    public boolean isNonBlockingRedelivery() {
2442        return nonBlockingRedelivery;
2443    }
2444
2445    public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2446        this.nonBlockingRedelivery = nonBlockingRedelivery;
2447    }
2448
2449    public boolean isRmIdFromConnectionId() {
2450        return rmIdFromConnectionId;
2451    }
2452
2453    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
2454        this.rmIdFromConnectionId = rmIdFromConnectionId;
2455    }
2456
2457    /**
2458     * Removes any TempDestinations that this connection has cached, ignoring
2459     * any exceptions generated because the destination is in use as they should
2460     * not be removed.
2461     * Used from a pooled connection, b/c it will not be explicitly closed.
2462     */
2463    public void cleanUpTempDestinations() {
2464
2465        if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2466            return;
2467        }
2468
2469        Iterator<ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2470            = this.activeTempDestinations.entrySet().iterator();
2471        while(entries.hasNext()) {
2472            ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2473            try {
2474                // Only delete this temp destination if it was created from this connection. The connection used
2475                // for the advisory consumer may also have a reference to this temp destination.
2476                ActiveMQTempDestination dest = entry.getValue();
2477                String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2478                if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2479                    this.deleteTempDestination(entry.getValue());
2480                }
2481            } catch (Exception ex) {
2482                // the temp dest is in use so it can not be deleted.
2483                // it is ok to leave it to connection tear down phase
2484            }
2485        }
2486    }
2487
2488    /**
2489     * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2490     * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2491     */
2492    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2493        this.redeliveryPolicyMap = redeliveryPolicyMap;
2494    }
2495
2496    /**
2497     * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2498     * Consumers when dealing with transaction messages that have been rolled back.
2499     *
2500     * @return the redeliveryPolicyMap
2501     */
2502    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2503        return redeliveryPolicyMap;
2504    }
2505
2506    public int getMaxThreadPoolSize() {
2507        return maxThreadPoolSize;
2508    }
2509
2510    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2511        this.maxThreadPoolSize = maxThreadPoolSize;
2512    }
2513
2514    /**
2515     * Enable enforcement of QueueConnection semantics.
2516     *
2517     * @return this object, useful for chaining
2518     */
2519    ActiveMQConnection enforceQueueOnlyConnection() {
2520        this.queueOnlyConnection = true;
2521        return this;
2522    }
2523
2524    public RejectedExecutionHandler getRejectedTaskHandler() {
2525        return rejectedTaskHandler;
2526    }
2527
2528    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2529        this.rejectedTaskHandler = rejectedTaskHandler;
2530    }
2531
2532    /**
2533     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2534     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
2535     * will not do any background Message acknowledgment.
2536     *
2537     * @return the scheduledOptimizedAckInterval
2538     */
2539    public long getOptimizedAckScheduledAckInterval() {
2540        return optimizedAckScheduledAckInterval;
2541    }
2542
2543    /**
2544     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2545     * have been configured with optimizeAcknowledge enabled.
2546     *
2547     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
2548     */
2549    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2550        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2551    }
2552
2553    /**
2554     * @return true if MessageConsumer instance will check for expired messages before dispatch.
2555     */
2556    public boolean isConsumerExpiryCheckEnabled() {
2557        return consumerExpiryCheckEnabled;
2558    }
2559
2560    /**
2561     * Controls whether message expiration checking is done in each MessageConsumer
2562     * prior to dispatching a message.  Disabling this check can lead to consumption
2563     * of expired messages.
2564     *
2565     * @param consumerExpiryCheckEnabled
2566     *        controls whether expiration checking is done prior to dispatch.
2567     */
2568    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
2569        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
2570    }
2571
2572    public List<String> getTrustedPackages() {
2573        return trustedPackages;
2574    }
2575
2576    public void setTrustedPackages(List<String> trustedPackages) {
2577        this.trustedPackages = trustedPackages;
2578    }
2579
2580    public boolean isTrustAllPackages() {
2581        return trustAllPackages;
2582    }
2583
2584    public void setTrustAllPackages(boolean trustAllPackages) {
2585        this.trustAllPackages = trustAllPackages;
2586    }
2587
2588    public int getConnectResponseTimeout() {
2589        return connectResponseTimeout;
2590    }
2591
2592        public void setConnectResponseTimeout(int connectResponseTimeout) {
2593                this.connectResponseTimeout = connectResponseTimeout;
2594        }
2595}