001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.*;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.RejectedExecutionHandler;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import javax.jms.Connection;
036import javax.jms.ConnectionConsumer;
037import javax.jms.ConnectionMetaData;
038import javax.jms.Destination;
039import javax.jms.ExceptionListener;
040import javax.jms.IllegalStateException;
041import javax.jms.InvalidDestinationException;
042import javax.jms.JMSException;
043import javax.jms.Queue;
044import javax.jms.QueueConnection;
045import javax.jms.QueueSession;
046import javax.jms.ServerSessionPool;
047import javax.jms.Session;
048import javax.jms.Topic;
049import javax.jms.TopicConnection;
050import javax.jms.TopicSession;
051import javax.jms.XAConnection;
052
053import org.apache.activemq.advisory.DestinationSource;
054import org.apache.activemq.blob.BlobTransferPolicy;
055import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
056import org.apache.activemq.command.ActiveMQDestination;
057import org.apache.activemq.command.ActiveMQMessage;
058import org.apache.activemq.command.ActiveMQTempDestination;
059import org.apache.activemq.command.ActiveMQTempQueue;
060import org.apache.activemq.command.ActiveMQTempTopic;
061import org.apache.activemq.command.BrokerInfo;
062import org.apache.activemq.command.Command;
063import org.apache.activemq.command.CommandTypes;
064import org.apache.activemq.command.ConnectionControl;
065import org.apache.activemq.command.ConnectionError;
066import org.apache.activemq.command.ConnectionId;
067import org.apache.activemq.command.ConnectionInfo;
068import org.apache.activemq.command.ConsumerControl;
069import org.apache.activemq.command.ConsumerId;
070import org.apache.activemq.command.ConsumerInfo;
071import org.apache.activemq.command.ControlCommand;
072import org.apache.activemq.command.DestinationInfo;
073import org.apache.activemq.command.ExceptionResponse;
074import org.apache.activemq.command.Message;
075import org.apache.activemq.command.MessageDispatch;
076import org.apache.activemq.command.MessageId;
077import org.apache.activemq.command.ProducerAck;
078import org.apache.activemq.command.ProducerId;
079import org.apache.activemq.command.RemoveInfo;
080import org.apache.activemq.command.RemoveSubscriptionInfo;
081import org.apache.activemq.command.Response;
082import org.apache.activemq.command.SessionId;
083import org.apache.activemq.command.ShutdownInfo;
084import org.apache.activemq.command.WireFormatInfo;
085import org.apache.activemq.management.JMSConnectionStatsImpl;
086import org.apache.activemq.management.JMSStatsImpl;
087import org.apache.activemq.management.StatsCapable;
088import org.apache.activemq.management.StatsImpl;
089import org.apache.activemq.state.CommandVisitorAdapter;
090import org.apache.activemq.thread.Scheduler;
091import org.apache.activemq.thread.TaskRunnerFactory;
092import org.apache.activemq.transport.FutureResponse;
093import org.apache.activemq.transport.RequestTimedOutIOException;
094import org.apache.activemq.transport.ResponseCallback;
095import org.apache.activemq.transport.Transport;
096import org.apache.activemq.transport.TransportListener;
097import org.apache.activemq.transport.failover.FailoverTransport;
098import org.apache.activemq.util.IdGenerator;
099import org.apache.activemq.util.IntrospectionSupport;
100import org.apache.activemq.util.JMSExceptionSupport;
101import org.apache.activemq.util.LongSequenceGenerator;
102import org.apache.activemq.util.ServiceSupport;
103import org.apache.activemq.util.ThreadPoolUtils;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
108
109    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
110    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
111    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
112    public static int DEFAULT_THREAD_POOL_SIZE = 1000;
113
114    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
115
116    public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
117
118    protected boolean dispatchAsync=true;
119    protected boolean alwaysSessionAsync = true;
120
121    private TaskRunnerFactory sessionTaskRunner;
122    private final ThreadPoolExecutor executor;
123
124    // Connection state variables
125    private final ConnectionInfo info;
126    private ExceptionListener exceptionListener;
127    private ClientInternalExceptionListener clientInternalExceptionListener;
128    private boolean clientIDSet;
129    private boolean isConnectionInfoSentToBroker;
130    private boolean userSpecifiedClientID;
131
132    // Configuration options variables
133    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
134    private BlobTransferPolicy blobTransferPolicy;
135    private RedeliveryPolicyMap redeliveryPolicyMap;
136    private MessageTransformer transformer;
137
138    private boolean disableTimeStampsByDefault;
139    private boolean optimizedMessageDispatch = true;
140    private boolean copyMessageOnSend = true;
141    private boolean useCompression;
142    private boolean objectMessageSerializationDefered;
143    private boolean useAsyncSend;
144    private boolean optimizeAcknowledge;
145    private long optimizeAcknowledgeTimeOut = 0;
146    private long optimizedAckScheduledAckInterval = 0;
147    private boolean nestedMapAndListEnabled = true;
148    private boolean useRetroactiveConsumer;
149    private boolean exclusiveConsumer;
150    private boolean alwaysSyncSend;
151    private int closeTimeout = 15000;
152    private boolean watchTopicAdvisories = true;
153    private long warnAboutUnstartedConnectionTimeout = 500L;
154    private int sendTimeout =0;
155    private boolean sendAcksAsync=true;
156    private boolean checkForDuplicates = true;
157    private boolean queueOnlyConnection = false;
158    private boolean consumerExpiryCheckEnabled = true;
159
160    private final Transport transport;
161    private final IdGenerator clientIdGenerator;
162    private final JMSStatsImpl factoryStats;
163    private final JMSConnectionStatsImpl stats;
164
165    private final AtomicBoolean started = new AtomicBoolean(false);
166    private final AtomicBoolean closing = new AtomicBoolean(false);
167    private final AtomicBoolean closed = new AtomicBoolean(false);
168    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
169    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
170    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
171    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
172
173    // Maps ConsumerIds to ActiveMQConsumer objects
174    private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
175    private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
176    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
177    private final SessionId connectionSessionId;
178    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
179    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
180    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
181
182    private AdvisoryConsumer advisoryConsumer;
183    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
184    private BrokerInfo brokerInfo;
185    private IOException firstFailureError;
186    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
187
188    // Assume that protocol is the latest. Change to the actual protocol
189    // version when a WireFormatInfo is received.
190    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
191    private final long timeCreated;
192    private final ConnectionAudit connectionAudit = new ConnectionAudit();
193    private DestinationSource destinationSource;
194    private final Object ensureConnectionInfoSentMutex = new Object();
195    private boolean useDedicatedTaskRunner;
196    protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
197    private long consumerFailoverRedeliveryWaitPeriod;
198    private Scheduler scheduler;
199    private boolean messagePrioritySupported = false;
200    private boolean transactedIndividualAck = false;
201    private boolean nonBlockingRedelivery = false;
202    private boolean rmIdFromConnectionId = false;
203
204    private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
205    private RejectedExecutionHandler rejectedTaskHandler = null;
206
207    private List<String> trustedPackages = new ArrayList<String>();
208    private boolean trustAllPackages = false;
209
210    /**
211     * Construct an <code>ActiveMQConnection</code>
212     *
213     * @param transport
214     * @param factoryStats
215     * @throws Exception
216     */
217    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
218
219        this.transport = transport;
220        this.clientIdGenerator = clientIdGenerator;
221        this.factoryStats = factoryStats;
222
223        // Configure a single threaded executor who's core thread can timeout if
224        // idle
225        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
226            @Override
227            public Thread newThread(Runnable r) {
228                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
229                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
230                //thread.setDaemon(true);
231                return thread;
232            }
233        });
234        // asyncConnectionThread.allowCoreThreadTimeOut(true);
235        String uniqueId = connectionIdGenerator.generateId();
236        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
237        this.info.setManageable(true);
238        this.info.setFaultTolerant(transport.isFaultTolerant());
239        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
240
241        this.transport.setTransportListener(this);
242
243        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
244        this.factoryStats.addConnection(this);
245        this.timeCreated = System.currentTimeMillis();
246        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
247    }
248
249    protected void setUserName(String userName) {
250        this.info.setUserName(userName);
251    }
252
253    protected void setPassword(String password) {
254        this.info.setPassword(password);
255    }
256
257    /**
258     * A static helper method to create a new connection
259     *
260     * @return an ActiveMQConnection
261     * @throws JMSException
262     */
263    public static ActiveMQConnection makeConnection() throws JMSException {
264        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
265        return (ActiveMQConnection)factory.createConnection();
266    }
267
268    /**
269     * A static helper method to create a new connection
270     *
271     * @param uri
272     * @return and ActiveMQConnection
273     * @throws JMSException
274     */
275    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
276        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
277        return (ActiveMQConnection)factory.createConnection();
278    }
279
280    /**
281     * A static helper method to create a new connection
282     *
283     * @param user
284     * @param password
285     * @param uri
286     * @return an ActiveMQConnection
287     * @throws JMSException
288     */
289    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
290        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
291        return (ActiveMQConnection)factory.createConnection();
292    }
293
294    /**
295     * @return a number unique for this connection
296     */
297    public JMSConnectionStatsImpl getConnectionStats() {
298        return stats;
299    }
300
301    /**
302     * Creates a <CODE>Session</CODE> object.
303     *
304     * @param transacted indicates whether the session is transacted
305     * @param acknowledgeMode indicates whether the consumer or the client will
306     *                acknowledge any messages it receives; ignored if the
307     *                session is transacted. Legal values are
308     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
309     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
310     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
311     * @return a newly created session
312     * @throws JMSException if the <CODE>Connection</CODE> object fails to
313     *                 create a session due to some internal error or lack of
314     *                 support for the specific transaction and acknowledgement
315     *                 mode.
316     * @see Session#AUTO_ACKNOWLEDGE
317     * @see Session#CLIENT_ACKNOWLEDGE
318     * @see Session#DUPS_OK_ACKNOWLEDGE
319     * @since 1.1
320     */
321    @Override
322    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
323        checkClosedOrFailed();
324        ensureConnectionInfoSent();
325        if(!transacted) {
326            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
327                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
328            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
329                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
330                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
331            }
332        }
333        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
334            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
335    }
336
337    /**
338     * @return sessionId
339     */
340    protected SessionId getNextSessionId() {
341        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
342    }
343
344    /**
345     * Gets the client identifier for this connection.
346     * <P>
347     * This value is specific to the JMS provider. It is either preconfigured by
348     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
349     * dynamically by the application by calling the <code>setClientID</code>
350     * method.
351     *
352     * @return the unique client identifier
353     * @throws JMSException if the JMS provider fails to return the client ID
354     *                 for this connection due to some internal error.
355     */
356    @Override
357    public String getClientID() throws JMSException {
358        checkClosedOrFailed();
359        return this.info.getClientId();
360    }
361
362    /**
363     * Sets the client identifier for this connection.
364     * <P>
365     * The preferred way to assign a JMS client's client identifier is for it to
366     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
367     * object and transparently assigned to the <CODE>Connection</CODE> object
368     * it creates.
369     * <P>
370     * Alternatively, a client can set a connection's client identifier using a
371     * provider-specific value. The facility to set a connection's client
372     * identifier explicitly is not a mechanism for overriding the identifier
373     * that has been administratively configured. It is provided for the case
374     * where no administratively specified identifier exists. If one does exist,
375     * an attempt to change it by setting it must throw an
376     * <CODE>IllegalStateException</CODE>. If a client sets the client
377     * identifier explicitly, it must do so immediately after it creates the
378     * connection and before any other action on the connection is taken. After
379     * this point, setting the client identifier is a programming error that
380     * should throw an <CODE>IllegalStateException</CODE>.
381     * <P>
382     * The purpose of the client identifier is to associate a connection and its
383     * objects with a state maintained on behalf of the client by a provider.
384     * The only such state identified by the JMS API is that required to support
385     * durable subscriptions.
386     * <P>
387     * If another connection with the same <code>clientID</code> is already
388     * running when this method is called, the JMS provider should detect the
389     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
390     *
391     * @param newClientID the unique client identifier
392     * @throws JMSException if the JMS provider fails to set the client ID for
393     *                 this connection due to some internal error.
394     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
395     *                 invalid or duplicate client ID.
396     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
397     *                 a connection's client ID at the wrong time or when it has
398     *                 been administratively configured.
399     */
400    @Override
401    public void setClientID(String newClientID) throws JMSException {
402        checkClosedOrFailed();
403
404        if (this.clientIDSet) {
405            throw new IllegalStateException("The clientID has already been set");
406        }
407
408        if (this.isConnectionInfoSentToBroker) {
409            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
410        }
411
412        this.info.setClientId(newClientID);
413        this.userSpecifiedClientID = true;
414        ensureConnectionInfoSent();
415    }
416
417    /**
418     * Sets the default client id that the connection will use if explicitly not
419     * set with the setClientId() call.
420     */
421    public void setDefaultClientID(String clientID) throws JMSException {
422        this.info.setClientId(clientID);
423        this.userSpecifiedClientID = true;
424    }
425
426    /**
427     * Gets the metadata for this connection.
428     *
429     * @return the connection metadata
430     * @throws JMSException if the JMS provider fails to get the connection
431     *                 metadata for this connection.
432     * @see javax.jms.ConnectionMetaData
433     */
434    @Override
435    public ConnectionMetaData getMetaData() throws JMSException {
436        checkClosedOrFailed();
437        return ActiveMQConnectionMetaData.INSTANCE;
438    }
439
440    /**
441     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
442     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
443     * associated with it.
444     *
445     * @return the <CODE>ExceptionListener</CODE> for this connection, or
446     *         null, if no <CODE>ExceptionListener</CODE> is associated with
447     *         this connection.
448     * @throws JMSException if the JMS provider fails to get the
449     *                 <CODE>ExceptionListener</CODE> for this connection.
450     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
451     */
452    @Override
453    public ExceptionListener getExceptionListener() throws JMSException {
454        checkClosedOrFailed();
455        return this.exceptionListener;
456    }
457
458    /**
459     * Sets an exception listener for this connection.
460     * <P>
461     * If a JMS provider detects a serious problem with a connection, it informs
462     * the connection's <CODE> ExceptionListener</CODE>, if one has been
463     * registered. It does this by calling the listener's <CODE>onException
464     * </CODE>
465     * method, passing it a <CODE>JMSException</CODE> object describing the
466     * problem.
467     * <P>
468     * An exception listener allows a client to be notified of a problem
469     * asynchronously. Some connections only consume messages, so they would
470     * have no other way to learn their connection has failed.
471     * <P>
472     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
473     * <P>
474     * A JMS provider should attempt to resolve connection problems itself
475     * before it notifies the client of them.
476     *
477     * @param listener the exception listener
478     * @throws JMSException if the JMS provider fails to set the exception
479     *                 listener for this connection.
480     */
481    @Override
482    public void setExceptionListener(ExceptionListener listener) throws JMSException {
483        checkClosedOrFailed();
484        this.exceptionListener = listener;
485    }
486
487    /**
488     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
489     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
490     * associated with it.
491     *
492     * @return the listener or <code>null</code> if no listener is registered with the connection.
493     */
494    public ClientInternalExceptionListener getClientInternalExceptionListener() {
495        return clientInternalExceptionListener;
496    }
497
498    /**
499     * Sets a client internal exception listener for this connection.
500     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
501     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
502     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
503     * describing the problem.
504     *
505     * @param listener the exception listener
506     */
507    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
508        this.clientInternalExceptionListener = listener;
509    }
510
511    /**
512     * Starts (or restarts) a connection's delivery of incoming messages. A call
513     * to <CODE>start</CODE> on a connection that has already been started is
514     * ignored.
515     *
516     * @throws JMSException if the JMS provider fails to start message delivery
517     *                 due to some internal error.
518     * @see javax.jms.Connection#stop()
519     */
520    @Override
521    public void start() throws JMSException {
522        checkClosedOrFailed();
523        ensureConnectionInfoSent();
524        if (started.compareAndSet(false, true)) {
525            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
526                ActiveMQSession session = i.next();
527                session.start();
528            }
529        }
530    }
531
532    /**
533     * Temporarily stops a connection's delivery of incoming messages. Delivery
534     * can be restarted using the connection's <CODE>start</CODE> method. When
535     * the connection is stopped, delivery to all the connection's message
536     * consumers is inhibited: synchronous receives block, and messages are not
537     * delivered to message listeners.
538     * <P>
539     * This call blocks until receives and/or message listeners in progress have
540     * completed.
541     * <P>
542     * Stopping a connection has no effect on its ability to send messages. A
543     * call to <CODE>stop</CODE> on a connection that has already been stopped
544     * is ignored.
545     * <P>
546     * A call to <CODE>stop</CODE> must not return until delivery of messages
547     * has paused. This means that a client can rely on the fact that none of
548     * its message listeners will be called and that all threads of control
549     * waiting for <CODE>receive</CODE> calls to return will not return with a
550     * message until the connection is restarted. The receive timers for a
551     * stopped connection continue to advance, so receives may time out while
552     * the connection is stopped.
553     * <P>
554     * If message listeners are running when <CODE>stop</CODE> is invoked, the
555     * <CODE>stop</CODE> call must wait until all of them have returned before
556     * it may return. While these message listeners are completing, they must
557     * have the full services of the connection available to them.
558     *
559     * @throws JMSException if the JMS provider fails to stop message delivery
560     *                 due to some internal error.
561     * @see javax.jms.Connection#start()
562     */
563    @Override
564    public void stop() throws JMSException {
565        doStop(true);
566    }
567
568    /**
569     * @see #stop()
570     * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed,
571     *                    <tt>false</tt> to skip this check
572     * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
573     */
574    void doStop(boolean checkClosed) throws JMSException {
575        if (checkClosed) {
576            checkClosedOrFailed();
577        }
578        if (started.compareAndSet(true, false)) {
579            synchronized(sessions) {
580                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
581                    ActiveMQSession s = i.next();
582                    s.stop();
583                }
584            }
585        }
586    }
587
588    /**
589     * Closes the connection.
590     * <P>
591     * Since a provider typically allocates significant resources outside the
592     * JVM on behalf of a connection, clients should close these resources when
593     * they are not needed. Relying on garbage collection to eventually reclaim
594     * these resources may not be timely enough.
595     * <P>
596     * There is no need to close the sessions, producers, and consumers of a
597     * closed connection.
598     * <P>
599     * Closing a connection causes all temporary destinations to be deleted.
600     * <P>
601     * When this method is invoked, it should not return until message
602     * processing has been shut down in an orderly fashion. This means that all
603     * message listeners that may have been running have returned, and that all
604     * pending receives have returned. A close terminates all pending message
605     * receives on the connection's sessions' consumers. The receives may return
606     * with a message or with null, depending on whether there was a message
607     * available at the time of the close. If one or more of the connection's
608     * sessions' message listeners is processing a message at the time when
609     * connection <CODE>close</CODE> is invoked, all the facilities of the
610     * connection and its sessions must remain available to those listeners
611     * until they return control to the JMS provider.
612     * <P>
613     * Closing a connection causes any of its sessions' transactions in progress
614     * to be rolled back. In the case where a session's work is coordinated by
615     * an external transaction manager, a session's <CODE>commit</CODE> and
616     * <CODE> rollback</CODE> methods are not used and the result of a closed
617     * session's work is determined later by the transaction manager. Closing a
618     * connection does NOT force an acknowledgment of client-acknowledged
619     * sessions.
620     * <P>
621     * Invoking the <CODE>acknowledge</CODE> method of a received message from
622     * a closed connection's session must throw an
623     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
624     * NOT throw an exception.
625     *
626     * @throws JMSException if the JMS provider fails to close the connection
627     *                 due to some internal error. For example, a failure to
628     *                 release resources or to close a socket connection can
629     *                 cause this exception to be thrown.
630     */
631    @Override
632    public void close() throws JMSException {
633        try {
634            // If we were running, lets stop first.
635            if (!closed.get() && !transportFailed.get()) {
636                // do not fail if already closed as according to JMS spec we must not
637                // throw exception if already closed
638                doStop(false);
639            }
640
641            synchronized (this) {
642                if (!closed.get()) {
643                    closing.set(true);
644
645                    if (destinationSource != null) {
646                        destinationSource.stop();
647                        destinationSource = null;
648                    }
649                    if (advisoryConsumer != null) {
650                        advisoryConsumer.dispose();
651                        advisoryConsumer = null;
652                    }
653
654                    Scheduler scheduler = this.scheduler;
655                    if (scheduler != null) {
656                        try {
657                            scheduler.stop();
658                        } catch (Exception e) {
659                            JMSException ex =  JMSExceptionSupport.create(e);
660                            throw ex;
661                        }
662                    }
663
664                    long lastDeliveredSequenceId = -1;
665                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
666                        ActiveMQSession s = i.next();
667                        s.dispose();
668                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
669                    }
670                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
671                        ActiveMQConnectionConsumer c = i.next();
672                        c.dispose();
673                    }
674
675                    this.activeTempDestinations.clear();
676
677                    try {
678                        if (isConnectionInfoSentToBroker) {
679                            // If we announced ourselves to the broker.. Try to let the broker
680                            // know that the connection is being shutdown.
681                            RemoveInfo removeCommand = info.createRemoveCommand();
682                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
683                            try {
684                                doSyncSendPacket(removeCommand, closeTimeout);
685                            } catch (JMSException e) {
686                                if (e.getCause() instanceof RequestTimedOutIOException) {
687                                    // expected
688                                } else {
689                                    throw e;
690                                }
691                            }
692                            doAsyncSendPacket(new ShutdownInfo());
693                        }
694                    } finally { // release anyway even if previous communication fails
695                        started.set(false);
696
697                        // TODO if we move the TaskRunnerFactory to the connection
698                        // factory
699                        // then we may need to call
700                        // factory.onConnectionClose(this);
701                        if (sessionTaskRunner != null) {
702                            sessionTaskRunner.shutdown();
703                        }
704                        closed.set(true);
705                        closing.set(false);
706                    }
707                }
708            }
709        } finally {
710            try {
711                if (executor != null) {
712                    ThreadPoolUtils.shutdown(executor);
713                }
714            } catch (Throwable e) {
715                LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
716            }
717
718            ServiceSupport.dispose(this.transport);
719
720            factoryStats.removeConnection(this);
721        }
722    }
723
724    /**
725     * Tells the broker to terminate its VM. This can be used to cleanly
726     * terminate a broker running in a standalone java process. Server must have
727     * property enable.vm.shutdown=true defined to allow this to work.
728     */
729    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
730    // implemented.
731    /*
732     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
733     * command = new BrokerAdminCommand();
734     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
735     * asyncSendPacket(command); }
736     */
737
738    /**
739     * Create a durable connection consumer for this connection (optional
740     * operation). This is an expert facility not used by regular JMS clients.
741     *
742     * @param topic topic to access
743     * @param subscriptionName durable subscription name
744     * @param messageSelector only messages with properties matching the message
745     *                selector expression are delivered. A value of null or an
746     *                empty string indicates that there is no message selector
747     *                for the message consumer.
748     * @param sessionPool the server session pool to associate with this durable
749     *                connection consumer
750     * @param maxMessages the maximum number of messages that can be assigned to
751     *                a server session at one time
752     * @return the durable connection consumer
753     * @throws JMSException if the <CODE>Connection</CODE> object fails to
754     *                 create a connection consumer due to some internal error
755     *                 or invalid arguments for <CODE>sessionPool</CODE> and
756     *                 <CODE>messageSelector</CODE>.
757     * @throws javax.jms.InvalidDestinationException if an invalid destination
758     *                 is specified.
759     * @throws javax.jms.InvalidSelectorException if the message selector is
760     *                 invalid.
761     * @see javax.jms.ConnectionConsumer
762     * @since 1.1
763     */
764    @Override
765    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
766        throws JMSException {
767        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
768    }
769
770    /**
771     * Create a durable connection consumer for this connection (optional
772     * operation). This is an expert facility not used by regular JMS clients.
773     *
774     * @param topic topic to access
775     * @param subscriptionName durable subscription name
776     * @param messageSelector only messages with properties matching the message
777     *                selector expression are delivered. A value of null or an
778     *                empty string indicates that there is no message selector
779     *                for the message consumer.
780     * @param sessionPool the server session pool to associate with this durable
781     *                connection consumer
782     * @param maxMessages the maximum number of messages that can be assigned to
783     *                a server session at one time
784     * @param noLocal set true if you want to filter out messages published
785     *                locally
786     * @return the durable connection consumer
787     * @throws JMSException if the <CODE>Connection</CODE> object fails to
788     *                 create a connection consumer due to some internal error
789     *                 or invalid arguments for <CODE>sessionPool</CODE> and
790     *                 <CODE>messageSelector</CODE>.
791     * @throws javax.jms.InvalidDestinationException if an invalid destination
792     *                 is specified.
793     * @throws javax.jms.InvalidSelectorException if the message selector is
794     *                 invalid.
795     * @see javax.jms.ConnectionConsumer
796     * @since 1.1
797     */
798    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
799                                                              boolean noLocal) throws JMSException {
800        checkClosedOrFailed();
801
802        if (queueOnlyConnection) {
803            throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
804        }
805
806        ensureConnectionInfoSent();
807        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
808        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
809        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
810        info.setSubscriptionName(subscriptionName);
811        info.setSelector(messageSelector);
812        info.setPrefetchSize(maxMessages);
813        info.setDispatchAsync(isDispatchAsync());
814
815        // Allows the options on the destination to configure the consumerInfo
816        if (info.getDestination().getOptions() != null) {
817            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
818            IntrospectionSupport.setProperties(this.info, options, "consumer.");
819        }
820
821        return new ActiveMQConnectionConsumer(this, sessionPool, info);
822    }
823
824    // Properties
825    // -------------------------------------------------------------------------
826
827    /**
828     * Returns true if this connection has been started
829     *
830     * @return true if this Connection is started
831     */
832    public boolean isStarted() {
833        return started.get();
834    }
835
836    /**
837     * Returns true if the connection is closed
838     */
839    public boolean isClosed() {
840        return closed.get();
841    }
842
843    /**
844     * Returns true if the connection is in the process of being closed
845     */
846    public boolean isClosing() {
847        return closing.get();
848    }
849
850    /**
851     * Returns true if the underlying transport has failed
852     */
853    public boolean isTransportFailed() {
854        return transportFailed.get();
855    }
856
857    /**
858     * @return Returns the prefetchPolicy.
859     */
860    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
861        return prefetchPolicy;
862    }
863
864    /**
865     * Sets the <a
866     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
867     * policy</a> for consumers created by this connection.
868     */
869    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
870        this.prefetchPolicy = prefetchPolicy;
871    }
872
873    /**
874     */
875    public Transport getTransportChannel() {
876        return transport;
877    }
878
879    /**
880     * @return Returns the clientID of the connection, forcing one to be
881     *         generated if one has not yet been configured.
882     */
883    public String getInitializedClientID() throws JMSException {
884        ensureConnectionInfoSent();
885        return info.getClientId();
886    }
887
888    /**
889     * @return Returns the timeStampsDisableByDefault.
890     */
891    public boolean isDisableTimeStampsByDefault() {
892        return disableTimeStampsByDefault;
893    }
894
895    /**
896     * Sets whether or not timestamps on messages should be disabled or not. If
897     * you disable them it adds a small performance boost.
898     */
899    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
900        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
901    }
902
903    /**
904     * @return Returns the dispatchOptimizedMessage.
905     */
906    public boolean isOptimizedMessageDispatch() {
907        return optimizedMessageDispatch;
908    }
909
910    /**
911     * If this flag is set then an larger prefetch limit is used - only
912     * applicable for durable topic subscribers.
913     */
914    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
915        this.optimizedMessageDispatch = dispatchOptimizedMessage;
916    }
917
918    /**
919     * @return Returns the closeTimeout.
920     */
921    public int getCloseTimeout() {
922        return closeTimeout;
923    }
924
925    /**
926     * Sets the timeout before a close is considered complete. Normally a
927     * close() on a connection waits for confirmation from the broker; this
928     * allows that operation to timeout to save the client hanging if there is
929     * no broker
930     */
931    public void setCloseTimeout(int closeTimeout) {
932        this.closeTimeout = closeTimeout;
933    }
934
935    /**
936     * @return ConnectionInfo
937     */
938    public ConnectionInfo getConnectionInfo() {
939        return this.info;
940    }
941
942    public boolean isUseRetroactiveConsumer() {
943        return useRetroactiveConsumer;
944    }
945
946    /**
947     * Sets whether or not retroactive consumers are enabled. Retroactive
948     * consumers allow non-durable topic subscribers to receive old messages
949     * that were published before the non-durable subscriber started.
950     */
951    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
952        this.useRetroactiveConsumer = useRetroactiveConsumer;
953    }
954
955    public boolean isNestedMapAndListEnabled() {
956        return nestedMapAndListEnabled;
957    }
958
959    /**
960     * Enables/disables whether or not Message properties and MapMessage entries
961     * support <a
962     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
963     * Structures</a> of Map and List objects
964     */
965    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
966        this.nestedMapAndListEnabled = structuredMapsEnabled;
967    }
968
969    public boolean isExclusiveConsumer() {
970        return exclusiveConsumer;
971    }
972
973    /**
974     * Enables or disables whether or not queue consumers should be exclusive or
975     * not for example to preserve ordering when not using <a
976     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
977     *
978     * @param exclusiveConsumer
979     */
980    public void setExclusiveConsumer(boolean exclusiveConsumer) {
981        this.exclusiveConsumer = exclusiveConsumer;
982    }
983
984    /**
985     * Adds a transport listener so that a client can be notified of events in
986     * the underlying transport
987     */
988    public void addTransportListener(TransportListener transportListener) {
989        transportListeners.add(transportListener);
990    }
991
992    public void removeTransportListener(TransportListener transportListener) {
993        transportListeners.remove(transportListener);
994    }
995
996    public boolean isUseDedicatedTaskRunner() {
997        return useDedicatedTaskRunner;
998    }
999
1000    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1001        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1002    }
1003
1004    public TaskRunnerFactory getSessionTaskRunner() {
1005        synchronized (this) {
1006            if (sessionTaskRunner == null) {
1007                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
1008                sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
1009            }
1010        }
1011        return sessionTaskRunner;
1012    }
1013
1014    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1015        this.sessionTaskRunner = sessionTaskRunner;
1016    }
1017
1018    public MessageTransformer getTransformer() {
1019        return transformer;
1020    }
1021
1022    /**
1023     * Sets the transformer used to transform messages before they are sent on
1024     * to the JMS bus or when they are received from the bus but before they are
1025     * delivered to the JMS client
1026     */
1027    public void setTransformer(MessageTransformer transformer) {
1028        this.transformer = transformer;
1029    }
1030
1031    /**
1032     * @return the statsEnabled
1033     */
1034    public boolean isStatsEnabled() {
1035        return this.stats.isEnabled();
1036    }
1037
1038    /**
1039     * @param statsEnabled the statsEnabled to set
1040     */
1041    public void setStatsEnabled(boolean statsEnabled) {
1042        this.stats.setEnabled(statsEnabled);
1043    }
1044
1045    /**
1046     * Returns the {@link DestinationSource} object which can be used to listen to destinations
1047     * being created or destroyed or to enquire about the current destinations available on the broker
1048     *
1049     * @return a lazily created destination source
1050     * @throws JMSException
1051     */
1052    @Override
1053    public DestinationSource getDestinationSource() throws JMSException {
1054        if (destinationSource == null) {
1055            destinationSource = new DestinationSource(this);
1056            destinationSource.start();
1057        }
1058        return destinationSource;
1059    }
1060
1061    // Implementation methods
1062    // -------------------------------------------------------------------------
1063
1064    /**
1065     * Used internally for adding Sessions to the Connection
1066     *
1067     * @param session
1068     * @throws JMSException
1069     * @throws JMSException
1070     */
1071    protected void addSession(ActiveMQSession session) throws JMSException {
1072        this.sessions.add(session);
1073        if (sessions.size() > 1 || session.isTransacted()) {
1074            optimizedMessageDispatch = false;
1075        }
1076    }
1077
1078    /**
1079     * Used interanlly for removing Sessions from a Connection
1080     *
1081     * @param session
1082     */
1083    protected void removeSession(ActiveMQSession session) {
1084        this.sessions.remove(session);
1085        this.removeDispatcher(session);
1086    }
1087
1088    /**
1089     * Add a ConnectionConsumer
1090     *
1091     * @param connectionConsumer
1092     * @throws JMSException
1093     */
1094    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1095        this.connectionConsumers.add(connectionConsumer);
1096    }
1097
1098    /**
1099     * Remove a ConnectionConsumer
1100     *
1101     * @param connectionConsumer
1102     */
1103    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1104        this.connectionConsumers.remove(connectionConsumer);
1105        this.removeDispatcher(connectionConsumer);
1106    }
1107
1108    /**
1109     * Creates a <CODE>TopicSession</CODE> object.
1110     *
1111     * @param transacted indicates whether the session is transacted
1112     * @param acknowledgeMode indicates whether the consumer or the client will
1113     *                acknowledge any messages it receives; ignored if the
1114     *                session is transacted. Legal values are
1115     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1116     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1117     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1118     * @return a newly created topic session
1119     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1120     *                 to create a session due to some internal error or lack of
1121     *                 support for the specific transaction and acknowledgement
1122     *                 mode.
1123     * @see Session#AUTO_ACKNOWLEDGE
1124     * @see Session#CLIENT_ACKNOWLEDGE
1125     * @see Session#DUPS_OK_ACKNOWLEDGE
1126     */
1127    @Override
1128    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1129        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1130    }
1131
1132    /**
1133     * Creates a connection consumer for this connection (optional operation).
1134     * This is an expert facility not used by regular JMS clients.
1135     *
1136     * @param topic the topic to access
1137     * @param messageSelector only messages with properties matching the message
1138     *                selector expression are delivered. A value of null or an
1139     *                empty string indicates that there is no message selector
1140     *                for the message consumer.
1141     * @param sessionPool the server session pool to associate with this
1142     *                connection consumer
1143     * @param maxMessages the maximum number of messages that can be assigned to
1144     *                a server session at one time
1145     * @return the connection consumer
1146     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1147     *                 to create a connection consumer due to some internal
1148     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1149     *                 and <CODE>messageSelector</CODE>.
1150     * @throws javax.jms.InvalidDestinationException if an invalid topic is
1151     *                 specified.
1152     * @throws javax.jms.InvalidSelectorException if the message selector is
1153     *                 invalid.
1154     * @see javax.jms.ConnectionConsumer
1155     */
1156    @Override
1157    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1158        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1159    }
1160
1161    /**
1162     * Creates a connection consumer for this connection (optional operation).
1163     * This is an expert facility not used by regular JMS clients.
1164     *
1165     * @param queue the queue to access
1166     * @param messageSelector only messages with properties matching the message
1167     *                selector expression are delivered. A value of null or an
1168     *                empty string indicates that there is no message selector
1169     *                for the message consumer.
1170     * @param sessionPool the server session pool to associate with this
1171     *                connection consumer
1172     * @param maxMessages the maximum number of messages that can be assigned to
1173     *                a server session at one time
1174     * @return the connection consumer
1175     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1176     *                 to create a connection consumer due to some internal
1177     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1178     *                 and <CODE>messageSelector</CODE>.
1179     * @throws javax.jms.InvalidDestinationException if an invalid queue is
1180     *                 specified.
1181     * @throws javax.jms.InvalidSelectorException if the message selector is
1182     *                 invalid.
1183     * @see javax.jms.ConnectionConsumer
1184     */
1185    @Override
1186    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1187        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1188    }
1189
1190    /**
1191     * Creates a connection consumer for this connection (optional operation).
1192     * This is an expert facility not used by regular JMS clients.
1193     *
1194     * @param destination the destination to access
1195     * @param messageSelector only messages with properties matching the message
1196     *                selector expression are delivered. A value of null or an
1197     *                empty string indicates that there is no message selector
1198     *                for the message consumer.
1199     * @param sessionPool the server session pool to associate with this
1200     *                connection consumer
1201     * @param maxMessages the maximum number of messages that can be assigned to
1202     *                a server session at one time
1203     * @return the connection consumer
1204     * @throws JMSException if the <CODE>Connection</CODE> object fails to
1205     *                 create a connection consumer due to some internal error
1206     *                 or invalid arguments for <CODE>sessionPool</CODE> and
1207     *                 <CODE>messageSelector</CODE>.
1208     * @throws javax.jms.InvalidDestinationException if an invalid destination
1209     *                 is specified.
1210     * @throws javax.jms.InvalidSelectorException if the message selector is
1211     *                 invalid.
1212     * @see javax.jms.ConnectionConsumer
1213     * @since 1.1
1214     */
1215    @Override
1216    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1217        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1218    }
1219
1220    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1221        throws JMSException {
1222
1223        checkClosedOrFailed();
1224        ensureConnectionInfoSent();
1225
1226        ConsumerId consumerId = createConsumerId();
1227        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1228        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1229        consumerInfo.setSelector(messageSelector);
1230        consumerInfo.setPrefetchSize(maxMessages);
1231        consumerInfo.setNoLocal(noLocal);
1232        consumerInfo.setDispatchAsync(isDispatchAsync());
1233
1234        // Allows the options on the destination to configure the consumerInfo
1235        if (consumerInfo.getDestination().getOptions() != null) {
1236            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1237            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1238        }
1239
1240        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1241    }
1242
1243    /**
1244     * @return a newly created ConsumedId unique to this connection session instance.
1245     */
1246    private ConsumerId createConsumerId() {
1247        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1248    }
1249
1250    /**
1251     * Creates a <CODE>QueueSession</CODE> object.
1252     *
1253     * @param transacted indicates whether the session is transacted
1254     * @param acknowledgeMode indicates whether the consumer or the client will
1255     *                acknowledge any messages it receives; ignored if the
1256     *                session is transacted. Legal values are
1257     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1258     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1259     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1260     * @return a newly created queue session
1261     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1262     *                 to create a session due to some internal error or lack of
1263     *                 support for the specific transaction and acknowledgement
1264     *                 mode.
1265     * @see Session#AUTO_ACKNOWLEDGE
1266     * @see Session#CLIENT_ACKNOWLEDGE
1267     * @see Session#DUPS_OK_ACKNOWLEDGE
1268     */
1269    @Override
1270    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1271        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1272    }
1273
1274    /**
1275     * Ensures that the clientID was manually specified and not auto-generated.
1276     * If the clientID was not specified this method will throw an exception.
1277     * This method is used to ensure that the clientID + durableSubscriber name
1278     * are used correctly.
1279     *
1280     * @throws JMSException
1281     */
1282    public void checkClientIDWasManuallySpecified() throws JMSException {
1283        if (!userSpecifiedClientID) {
1284            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1285        }
1286    }
1287
1288    /**
1289     * send a Packet through the Connection - for internal use only
1290     *
1291     * @param command
1292     * @throws JMSException
1293     */
1294    public void asyncSendPacket(Command command) throws JMSException {
1295        if (isClosed()) {
1296            throw new ConnectionClosedException();
1297        } else {
1298            doAsyncSendPacket(command);
1299        }
1300    }
1301
1302    private void doAsyncSendPacket(Command command) throws JMSException {
1303        try {
1304            this.transport.oneway(command);
1305        } catch (IOException e) {
1306            throw JMSExceptionSupport.create(e);
1307        }
1308    }
1309
1310    /**
1311     * Send a packet through a Connection - for internal use only
1312     *
1313     * @param command
1314     *
1315     * @throws JMSException
1316     */
1317    public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1318        if(onComplete==null) {
1319            syncSendPacket(command);
1320        } else {
1321            if (isClosed()) {
1322                throw new ConnectionClosedException();
1323            }
1324            try {
1325                this.transport.asyncRequest(command, new ResponseCallback() {
1326                    @Override
1327                    public void onCompletion(FutureResponse resp) {
1328                        Response response;
1329                        Throwable exception = null;
1330                        try {
1331                            response = resp.getResult();
1332                            if (response.isException()) {
1333                                ExceptionResponse er = (ExceptionResponse)response;
1334                                exception = er.getException();
1335                            }
1336                        } catch (Exception e) {
1337                            exception = e;
1338                        }
1339                        if(exception!=null) {
1340                            if ( exception instanceof JMSException) {
1341                                onComplete.onException((JMSException) exception);
1342                            } else {
1343                                if (isClosed()||closing.get()) {
1344                                    LOG.debug("Received an exception but connection is closing");
1345                                }
1346                                JMSException jmsEx = null;
1347                                try {
1348                                    jmsEx = JMSExceptionSupport.create(exception);
1349                                } catch(Throwable e) {
1350                                    LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1351                                }
1352                                // dispose of transport for security exceptions on connection initiation
1353                                if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1354                                    forceCloseOnSecurityException(exception);
1355                                }
1356                                if (jmsEx !=null) {
1357                                    onComplete.onException(jmsEx);
1358                                }
1359                            }
1360                        } else {
1361                            onComplete.onSuccess();
1362                        }
1363                    }
1364                });
1365            } catch (IOException e) {
1366                throw JMSExceptionSupport.create(e);
1367            }
1368        }
1369    }
1370
1371    private void forceCloseOnSecurityException(Throwable exception) {
1372        LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception);
1373        onException(new IOException("Force close due to SecurityException on connect", exception));
1374    }
1375
1376    public Response syncSendPacket(Command command) throws JMSException {
1377        if (isClosed()) {
1378            throw new ConnectionClosedException();
1379        } else {
1380
1381            try {
1382                Response response = (Response)this.transport.request(command);
1383                if (response.isException()) {
1384                    ExceptionResponse er = (ExceptionResponse)response;
1385                    if (er.getException() instanceof JMSException) {
1386                        throw (JMSException)er.getException();
1387                    } else {
1388                        if (isClosed()||closing.get()) {
1389                            LOG.debug("Received an exception but connection is closing");
1390                        }
1391                        JMSException jmsEx = null;
1392                        try {
1393                            jmsEx = JMSExceptionSupport.create(er.getException());
1394                        } catch(Throwable e) {
1395                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1396                        }
1397                        if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1398                            forceCloseOnSecurityException(er.getException());
1399                        }
1400                        if (jmsEx !=null) {
1401                            throw jmsEx;
1402                        }
1403                    }
1404                }
1405                return response;
1406            } catch (IOException e) {
1407                throw JMSExceptionSupport.create(e);
1408            }
1409        }
1410    }
1411
1412    /**
1413     * Send a packet through a Connection - for internal use only
1414     *
1415     * @param command
1416     *
1417     * @return the broker Response for the given Command.
1418     *
1419     * @throws JMSException
1420     */
1421    public Response syncSendPacket(Command command, int timeout) throws JMSException {
1422        if (isClosed() || closing.get()) {
1423            throw new ConnectionClosedException();
1424        } else {
1425            return doSyncSendPacket(command, timeout);
1426        }
1427    }
1428
1429    private Response doSyncSendPacket(Command command, int timeout)
1430            throws JMSException {
1431        try {
1432            Response response = (Response) (timeout > 0
1433                    ? this.transport.request(command, timeout)
1434                    : this.transport.request(command));
1435            if (response != null && response.isException()) {
1436                ExceptionResponse er = (ExceptionResponse)response;
1437                if (er.getException() instanceof JMSException) {
1438                    throw (JMSException)er.getException();
1439                } else {
1440                    throw JMSExceptionSupport.create(er.getException());
1441                }
1442            }
1443            return response;
1444        } catch (IOException e) {
1445            throw JMSExceptionSupport.create(e);
1446        }
1447    }
1448
1449    /**
1450     * @return statistics for this Connection
1451     */
1452    @Override
1453    public StatsImpl getStats() {
1454        return stats;
1455    }
1456
1457    /**
1458     * simply throws an exception if the Connection is already closed or the
1459     * Transport has failed
1460     *
1461     * @throws JMSException
1462     */
1463    protected synchronized void checkClosedOrFailed() throws JMSException {
1464        checkClosed();
1465        if (transportFailed.get()) {
1466            throw new ConnectionFailedException(firstFailureError);
1467        }
1468    }
1469
1470    /**
1471     * simply throws an exception if the Connection is already closed
1472     *
1473     * @throws JMSException
1474     */
1475    protected synchronized void checkClosed() throws JMSException {
1476        if (closed.get()) {
1477            throw new ConnectionClosedException();
1478        }
1479    }
1480
1481    /**
1482     * Send the ConnectionInfo to the Broker
1483     *
1484     * @throws JMSException
1485     */
1486    protected void ensureConnectionInfoSent() throws JMSException {
1487        synchronized(this.ensureConnectionInfoSentMutex) {
1488            // Can we skip sending the ConnectionInfo packet??
1489            if (isConnectionInfoSentToBroker || closed.get()) {
1490                return;
1491            }
1492            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1493            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1494                info.setClientId(clientIdGenerator.generateId());
1495            }
1496            syncSendPacket(info.copy());
1497
1498            this.isConnectionInfoSentToBroker = true;
1499            // Add a temp destination advisory consumer so that
1500            // We know what the valid temporary destinations are on the
1501            // broker without having to do an RPC to the broker.
1502
1503            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1504            if (watchTopicAdvisories) {
1505                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1506            }
1507        }
1508    }
1509
1510    public synchronized boolean isWatchTopicAdvisories() {
1511        return watchTopicAdvisories;
1512    }
1513
1514    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1515        this.watchTopicAdvisories = watchTopicAdvisories;
1516    }
1517
1518    /**
1519     * @return Returns the useAsyncSend.
1520     */
1521    public boolean isUseAsyncSend() {
1522        return useAsyncSend;
1523    }
1524
1525    /**
1526     * Forces the use of <a
1527     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1528     * adds a massive performance boost; but means that the send() method will
1529     * return immediately whether the message has been sent or not which could
1530     * lead to message loss.
1531     */
1532    public void setUseAsyncSend(boolean useAsyncSend) {
1533        this.useAsyncSend = useAsyncSend;
1534    }
1535
1536    /**
1537     * @return true if always sync send messages
1538     */
1539    public boolean isAlwaysSyncSend() {
1540        return this.alwaysSyncSend;
1541    }
1542
1543    /**
1544     * Set true if always require messages to be sync sent
1545     *
1546     * @param alwaysSyncSend
1547     */
1548    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1549        this.alwaysSyncSend = alwaysSyncSend;
1550    }
1551
1552    /**
1553     * @return the messagePrioritySupported
1554     */
1555    public boolean isMessagePrioritySupported() {
1556        return this.messagePrioritySupported;
1557    }
1558
1559    /**
1560     * @param messagePrioritySupported the messagePrioritySupported to set
1561     */
1562    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1563        this.messagePrioritySupported = messagePrioritySupported;
1564    }
1565
1566    /**
1567     * Cleans up this connection so that it's state is as if the connection was
1568     * just created. This allows the Resource Adapter to clean up a connection
1569     * so that it can be reused without having to close and recreate the
1570     * connection.
1571     */
1572    public void cleanup() throws JMSException {
1573        doCleanup(false);
1574    }
1575
1576    public void doCleanup(boolean removeConnection) throws JMSException {
1577        if (advisoryConsumer != null && !isTransportFailed()) {
1578            advisoryConsumer.dispose();
1579            advisoryConsumer = null;
1580        }
1581
1582        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1583            ActiveMQSession s = i.next();
1584            s.dispose();
1585        }
1586        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1587            ActiveMQConnectionConsumer c = i.next();
1588            c.dispose();
1589        }
1590
1591        if (removeConnection) {
1592            if (isConnectionInfoSentToBroker) {
1593                if (!transportFailed.get() && !closing.get()) {
1594                    syncSendPacket(info.createRemoveCommand());
1595                }
1596                isConnectionInfoSentToBroker = false;
1597            }
1598            if (userSpecifiedClientID) {
1599                info.setClientId(null);
1600                userSpecifiedClientID = false;
1601            }
1602            clientIDSet = false;
1603        }
1604
1605        started.set(false);
1606    }
1607
1608    /**
1609     * Changes the associated username/password that is associated with this
1610     * connection. If the connection has been used, you must called cleanup()
1611     * before calling this method.
1612     *
1613     * @throws IllegalStateException if the connection is in used.
1614     */
1615    public void changeUserInfo(String userName, String password) throws JMSException {
1616        if (isConnectionInfoSentToBroker) {
1617            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1618        }
1619        this.info.setUserName(userName);
1620        this.info.setPassword(password);
1621    }
1622
1623    /**
1624     * @return Returns the resourceManagerId.
1625     * @throws JMSException
1626     */
1627    public String getResourceManagerId() throws JMSException {
1628        if (isRmIdFromConnectionId()) {
1629            return info.getConnectionId().getValue();
1630        }
1631        waitForBrokerInfo();
1632        if (brokerInfo == null) {
1633            throw new JMSException("Connection failed before Broker info was received.");
1634        }
1635        return brokerInfo.getBrokerId().getValue();
1636    }
1637
1638    /**
1639     * Returns the broker name if one is available or null if one is not
1640     * available yet.
1641     */
1642    public String getBrokerName() {
1643        try {
1644            brokerInfoReceived.await(5, TimeUnit.SECONDS);
1645            if (brokerInfo == null) {
1646                return null;
1647            }
1648            return brokerInfo.getBrokerName();
1649        } catch (InterruptedException e) {
1650            Thread.currentThread().interrupt();
1651            return null;
1652        }
1653    }
1654
1655    /**
1656     * Returns the broker information if it is available or null if it is not
1657     * available yet.
1658     */
1659    public BrokerInfo getBrokerInfo() {
1660        return brokerInfo;
1661    }
1662
1663    /**
1664     * @return Returns the RedeliveryPolicy.
1665     * @throws JMSException
1666     */
1667    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1668        return redeliveryPolicyMap.getDefaultEntry();
1669    }
1670
1671    /**
1672     * Sets the redelivery policy to be used when messages are rolled back
1673     */
1674    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1675        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1676    }
1677
1678    public BlobTransferPolicy getBlobTransferPolicy() {
1679        if (blobTransferPolicy == null) {
1680            blobTransferPolicy = createBlobTransferPolicy();
1681        }
1682        return blobTransferPolicy;
1683    }
1684
1685    /**
1686     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1687     * OBjects) are transferred from producers to brokers to consumers
1688     */
1689    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1690        this.blobTransferPolicy = blobTransferPolicy;
1691    }
1692
1693    /**
1694     * @return Returns the alwaysSessionAsync.
1695     */
1696    public boolean isAlwaysSessionAsync() {
1697        return alwaysSessionAsync;
1698    }
1699
1700    /**
1701     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1702     * the Connection. However, a separate thread is always used if there is more than one session, or the session
1703     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
1704     * happens asynchronously.
1705     */
1706    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1707        this.alwaysSessionAsync = alwaysSessionAsync;
1708    }
1709
1710    /**
1711     * @return Returns the optimizeAcknowledge.
1712     */
1713    public boolean isOptimizeAcknowledge() {
1714        return optimizeAcknowledge;
1715    }
1716
1717    /**
1718     * Enables an optimised acknowledgement mode where messages are acknowledged
1719     * in batches rather than individually
1720     *
1721     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1722     */
1723    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1724        this.optimizeAcknowledge = optimizeAcknowledge;
1725    }
1726
1727    /**
1728     * The max time in milliseconds between optimized ack batches
1729     * @param optimizeAcknowledgeTimeOut
1730     */
1731    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1732        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1733    }
1734
1735    public long getOptimizeAcknowledgeTimeOut() {
1736        return optimizeAcknowledgeTimeOut;
1737    }
1738
1739    public long getWarnAboutUnstartedConnectionTimeout() {
1740        return warnAboutUnstartedConnectionTimeout;
1741    }
1742
1743    /**
1744     * Enables the timeout from a connection creation to when a warning is
1745     * generated if the connection is not properly started via {@link #start()}
1746     * and a message is received by a consumer. It is a very common gotcha to
1747     * forget to <a
1748     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1749     * the connection</a> so this option makes the default case to create a
1750     * warning if the user forgets. To disable the warning just set the value to <
1751     * 0 (say -1).
1752     */
1753    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1754        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1755    }
1756
1757    /**
1758     * @return the sendTimeout (in milliseconds)
1759     */
1760    public int getSendTimeout() {
1761        return sendTimeout;
1762    }
1763
1764    /**
1765     * @param sendTimeout the sendTimeout to set (in milliseconds)
1766     */
1767    public void setSendTimeout(int sendTimeout) {
1768        this.sendTimeout = sendTimeout;
1769    }
1770
1771    /**
1772     * @return the sendAcksAsync
1773     */
1774    public boolean isSendAcksAsync() {
1775        return sendAcksAsync;
1776    }
1777
1778    /**
1779     * @param sendAcksAsync the sendAcksAsync to set
1780     */
1781    public void setSendAcksAsync(boolean sendAcksAsync) {
1782        this.sendAcksAsync = sendAcksAsync;
1783    }
1784
1785    /**
1786     * Returns the time this connection was created
1787     */
1788    public long getTimeCreated() {
1789        return timeCreated;
1790    }
1791
1792    private void waitForBrokerInfo() throws JMSException {
1793        try {
1794            brokerInfoReceived.await();
1795        } catch (InterruptedException e) {
1796            Thread.currentThread().interrupt();
1797            throw JMSExceptionSupport.create(e);
1798        }
1799    }
1800
1801    // Package protected so that it can be used in unit tests
1802    public Transport getTransport() {
1803        return transport;
1804    }
1805
1806    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1807        producers.put(producerId, producer);
1808    }
1809
1810    public void removeProducer(ProducerId producerId) {
1811        producers.remove(producerId);
1812    }
1813
1814    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1815        dispatchers.put(consumerId, dispatcher);
1816    }
1817
1818    public void removeDispatcher(ConsumerId consumerId) {
1819        dispatchers.remove(consumerId);
1820    }
1821
1822    public boolean hasDispatcher(ConsumerId consumerId) {
1823        return dispatchers.containsKey(consumerId);
1824    }
1825
1826    /**
1827     * @param o - the command to consume
1828     */
1829    @Override
1830    public void onCommand(final Object o) {
1831        final Command command = (Command)o;
1832        if (!closed.get() && command != null) {
1833            try {
1834                command.visit(new CommandVisitorAdapter() {
1835                    @Override
1836                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
1837                        waitForTransportInterruptionProcessingToComplete();
1838                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1839                        if (dispatcher != null) {
1840                            // Copy in case a embedded broker is dispatching via
1841                            // vm://
1842                            // md.getMessage() == null to signal end of queue
1843                            // browse.
1844                            Message msg = md.getMessage();
1845                            if (msg != null) {
1846                                msg = msg.copy();
1847                                msg.setReadOnlyBody(true);
1848                                msg.setReadOnlyProperties(true);
1849                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1850                                msg.setConnection(ActiveMQConnection.this);
1851                                msg.setMemoryUsage(null);
1852                                md.setMessage(msg);
1853                            }
1854                            dispatcher.dispatch(md);
1855                        } else {
1856                            LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
1857                        }
1858                        return null;
1859                    }
1860
1861                    @Override
1862                    public Response processProducerAck(ProducerAck pa) throws Exception {
1863                        if (pa != null && pa.getProducerId() != null) {
1864                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1865                            if (producer != null) {
1866                                producer.onProducerAck(pa);
1867                            }
1868                        }
1869                        return null;
1870                    }
1871
1872                    @Override
1873                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
1874                        brokerInfo = info;
1875                        brokerInfoReceived.countDown();
1876                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1877                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1878                        return null;
1879                    }
1880
1881                    @Override
1882                    public Response processConnectionError(final ConnectionError error) throws Exception {
1883                        executor.execute(new Runnable() {
1884                            @Override
1885                            public void run() {
1886                                onAsyncException(error.getException());
1887                            }
1888                        });
1889                        return null;
1890                    }
1891
1892                    @Override
1893                    public Response processControlCommand(ControlCommand command) throws Exception {
1894                        onControlCommand(command);
1895                        return null;
1896                    }
1897
1898                    @Override
1899                    public Response processConnectionControl(ConnectionControl control) throws Exception {
1900                        onConnectionControl((ConnectionControl)command);
1901                        return null;
1902                    }
1903
1904                    @Override
1905                    public Response processConsumerControl(ConsumerControl control) throws Exception {
1906                        onConsumerControl((ConsumerControl)command);
1907                        return null;
1908                    }
1909
1910                    @Override
1911                    public Response processWireFormat(WireFormatInfo info) throws Exception {
1912                        onWireFormatInfo((WireFormatInfo)command);
1913                        return null;
1914                    }
1915                });
1916            } catch (Exception e) {
1917                onClientInternalException(e);
1918            }
1919        }
1920
1921        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1922            TransportListener listener = iter.next();
1923            listener.onCommand(command);
1924        }
1925    }
1926
1927    protected void onWireFormatInfo(WireFormatInfo info) {
1928        protocolVersion.set(info.getVersion());
1929    }
1930
1931    /**
1932     * Handles async client internal exceptions.
1933     * A client internal exception is usually one that has been thrown
1934     * by a container runtime component during asynchronous processing of a
1935     * message that does not affect the connection itself.
1936     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1937     * its <code>onException</code> method, if one has been registered with this connection.
1938     *
1939     * @param error the exception that the problem
1940     */
1941    public void onClientInternalException(final Throwable error) {
1942        if ( !closed.get() && !closing.get() ) {
1943            if ( this.clientInternalExceptionListener != null ) {
1944                executor.execute(new Runnable() {
1945                    @Override
1946                    public void run() {
1947                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1948                    }
1949                });
1950            } else {
1951                LOG.debug("Async client internal exception occurred with no exception listener registered: "
1952                        + error, error);
1953            }
1954        }
1955    }
1956
1957    /**
1958     * Used for handling async exceptions
1959     *
1960     * @param error
1961     */
1962    public void onAsyncException(Throwable error) {
1963        if (!closed.get() && !closing.get()) {
1964            if (this.exceptionListener != null) {
1965
1966                if (!(error instanceof JMSException)) {
1967                    error = JMSExceptionSupport.create(error);
1968                }
1969                final JMSException e = (JMSException)error;
1970
1971                executor.execute(new Runnable() {
1972                    @Override
1973                    public void run() {
1974                        ActiveMQConnection.this.exceptionListener.onException(e);
1975                    }
1976                });
1977
1978            } else {
1979                LOG.debug("Async exception with no exception listener: " + error, error);
1980            }
1981        }
1982    }
1983
1984    @Override
1985    public void onException(final IOException error) {
1986        onAsyncException(error);
1987        if (!closing.get() && !closed.get()) {
1988            executor.execute(new Runnable() {
1989                @Override
1990                public void run() {
1991                    transportFailed(error);
1992                    ServiceSupport.dispose(ActiveMQConnection.this.transport);
1993                    brokerInfoReceived.countDown();
1994                    try {
1995                        doCleanup(true);
1996                    } catch (JMSException e) {
1997                        LOG.warn("Exception during connection cleanup, " + e, e);
1998                    }
1999                    for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2000                        TransportListener listener = iter.next();
2001                        listener.onException(error);
2002                    }
2003                }
2004            });
2005        }
2006    }
2007
2008    @Override
2009    public void transportInterupted() {
2010        transportInterruptionProcessingComplete.set(1);
2011        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2012            ActiveMQSession s = i.next();
2013            s.clearMessagesInProgress(transportInterruptionProcessingComplete);
2014        }
2015
2016        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
2017            connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
2018        }
2019
2020        if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
2021            if (LOG.isDebugEnabled()) {
2022                LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get());
2023            }
2024            signalInterruptionProcessingNeeded();
2025        }
2026
2027        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2028            TransportListener listener = iter.next();
2029            listener.transportInterupted();
2030        }
2031    }
2032
2033    @Override
2034    public void transportResumed() {
2035        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2036            TransportListener listener = iter.next();
2037            listener.transportResumed();
2038        }
2039    }
2040
2041    /**
2042     * Create the DestinationInfo object for the temporary destination.
2043     *
2044     * @param topic - if its true topic, else queue.
2045     * @return DestinationInfo
2046     * @throws JMSException
2047     */
2048    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2049
2050        // Check if Destination info is of temporary type.
2051        ActiveMQTempDestination dest;
2052        if (topic) {
2053            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2054        } else {
2055            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2056        }
2057
2058        DestinationInfo info = new DestinationInfo();
2059        info.setConnectionId(this.info.getConnectionId());
2060        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2061        info.setDestination(dest);
2062        syncSendPacket(info);
2063
2064        dest.setConnection(this);
2065        activeTempDestinations.put(dest, dest);
2066        return dest;
2067    }
2068
2069    /**
2070     * @param destination
2071     * @throws JMSException
2072     */
2073    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2074
2075        checkClosedOrFailed();
2076
2077        for (ActiveMQSession session : this.sessions) {
2078            if (session.isInUse(destination)) {
2079                throw new JMSException("A consumer is consuming from the temporary destination");
2080            }
2081        }
2082
2083        activeTempDestinations.remove(destination);
2084
2085        DestinationInfo destInfo = new DestinationInfo();
2086        destInfo.setConnectionId(this.info.getConnectionId());
2087        destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2088        destInfo.setDestination(destination);
2089        destInfo.setTimeout(0);
2090        syncSendPacket(destInfo);
2091    }
2092
2093    public boolean isDeleted(ActiveMQDestination dest) {
2094
2095        // If we are not watching the advisories.. then
2096        // we will assume that the temp destination does exist.
2097        if (advisoryConsumer == null) {
2098            return false;
2099        }
2100
2101        return !activeTempDestinations.containsValue(dest);
2102    }
2103
2104    public boolean isCopyMessageOnSend() {
2105        return copyMessageOnSend;
2106    }
2107
2108    public LongSequenceGenerator getLocalTransactionIdGenerator() {
2109        return localTransactionIdGenerator;
2110    }
2111
2112    public boolean isUseCompression() {
2113        return useCompression;
2114    }
2115
2116    /**
2117     * Enables the use of compression of the message bodies
2118     */
2119    public void setUseCompression(boolean useCompression) {
2120        this.useCompression = useCompression;
2121    }
2122
2123    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2124
2125        checkClosedOrFailed();
2126        ensureConnectionInfoSent();
2127
2128        DestinationInfo info = new DestinationInfo();
2129        info.setConnectionId(this.info.getConnectionId());
2130        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2131        info.setDestination(destination);
2132        info.setTimeout(0);
2133        syncSendPacket(info);
2134    }
2135
2136    public boolean isDispatchAsync() {
2137        return dispatchAsync;
2138    }
2139
2140    /**
2141     * Enables or disables the default setting of whether or not consumers have
2142     * their messages <a
2143     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2144     * synchronously or asynchronously by the broker</a>. For non-durable
2145     * topics for example we typically dispatch synchronously by default to
2146     * minimize context switches which boost performance. However sometimes its
2147     * better to go slower to ensure that a single blocked consumer socket does
2148     * not block delivery to other consumers.
2149     *
2150     * @param asyncDispatch If true then consumers created on this connection
2151     *                will default to having their messages dispatched
2152     *                asynchronously. The default value is true.
2153     */
2154    public void setDispatchAsync(boolean asyncDispatch) {
2155        this.dispatchAsync = asyncDispatch;
2156    }
2157
2158    public boolean isObjectMessageSerializationDefered() {
2159        return objectMessageSerializationDefered;
2160    }
2161
2162    /**
2163     * When an object is set on an ObjectMessage, the JMS spec requires the
2164     * object to be serialized by that set method. Enabling this flag causes the
2165     * object to not get serialized. The object may subsequently get serialized
2166     * if the message needs to be sent over a socket or stored to disk.
2167     */
2168    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2169        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2170    }
2171
2172    /**
2173     * Unsubscribes a durable subscription that has been created by a client.
2174     * <P>
2175     * This method deletes the state being maintained on behalf of the
2176     * subscriber by its provider.
2177     * <P>
2178     * It is erroneous for a client to delete a durable subscription while there
2179     * is an active <CODE>MessageConsumer </CODE> or
2180     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2181     * message is part of a pending transaction or has not been acknowledged in
2182     * the session.
2183     *
2184     * @param name the name used to identify this subscription
2185     * @throws JMSException if the session fails to unsubscribe to the durable
2186     *                 subscription due to some internal error.
2187     * @throws InvalidDestinationException if an invalid subscription name is
2188     *                 specified.
2189     * @since 1.1
2190     */
2191    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2192        checkClosedOrFailed();
2193        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2194        rsi.setConnectionId(getConnectionInfo().getConnectionId());
2195        rsi.setSubscriptionName(name);
2196        rsi.setClientId(getConnectionInfo().getClientId());
2197        syncSendPacket(rsi);
2198    }
2199
2200    /**
2201     * Internal send method optimized: - It does not copy the message - It can
2202     * only handle ActiveMQ messages. - You can specify if the send is async or
2203     * sync - Does not allow you to send /w a transaction.
2204     */
2205    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2206        checkClosedOrFailed();
2207
2208        if (destination.isTemporary() && isDeleted(destination)) {
2209            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2210        }
2211
2212        msg.setJMSDestination(destination);
2213        msg.setJMSDeliveryMode(deliveryMode);
2214        long expiration = 0L;
2215
2216        if (!isDisableTimeStampsByDefault()) {
2217            long timeStamp = System.currentTimeMillis();
2218            msg.setJMSTimestamp(timeStamp);
2219            if (timeToLive > 0) {
2220                expiration = timeToLive + timeStamp;
2221            }
2222        }
2223
2224        msg.setJMSExpiration(expiration);
2225        msg.setJMSPriority(priority);
2226        msg.setJMSRedelivered(false);
2227        msg.setMessageId(messageId);
2228        msg.onSend();
2229        msg.setProducerId(msg.getMessageId().getProducerId());
2230
2231        if (LOG.isDebugEnabled()) {
2232            LOG.debug("Sending message: " + msg);
2233        }
2234
2235        if (async) {
2236            asyncSendPacket(msg);
2237        } else {
2238            syncSendPacket(msg);
2239        }
2240    }
2241
2242    protected void onControlCommand(ControlCommand command) {
2243        String text = command.getCommand();
2244        if (text != null) {
2245            if ("shutdown".equals(text)) {
2246                LOG.info("JVM told to shutdown");
2247                System.exit(0);
2248            }
2249
2250            // TODO Should we handle the "close" case?
2251            // if (false && "close".equals(text)){
2252            //     LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2253            //     try {
2254            //         close();
2255            //     } catch (JMSException e) {
2256            //     }
2257            // }
2258        }
2259    }
2260
2261    protected void onConnectionControl(ConnectionControl command) {
2262        if (command.isFaultTolerant()) {
2263            this.optimizeAcknowledge = false;
2264            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2265                ActiveMQSession s = i.next();
2266                s.setOptimizeAcknowledge(false);
2267            }
2268        }
2269    }
2270
2271    protected void onConsumerControl(ConsumerControl command) {
2272        if (command.isClose()) {
2273            for (ActiveMQSession session : this.sessions) {
2274                session.close(command.getConsumerId());
2275            }
2276        } else {
2277            for (ActiveMQSession session : this.sessions) {
2278                session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2279            }
2280            for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
2281                ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
2282                if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
2283                    consumerInfo.setPrefetchSize(command.getPrefetch());
2284                }
2285            }
2286        }
2287    }
2288
2289    protected void transportFailed(IOException error) {
2290        transportFailed.set(true);
2291        if (firstFailureError == null) {
2292            firstFailureError = error;
2293        }
2294    }
2295
2296    /**
2297     * Should a JMS message be copied to a new JMS Message object as part of the
2298     * send() method in JMS. This is enabled by default to be compliant with the
2299     * JMS specification. You can disable it if you do not mutate JMS messages
2300     * after they are sent for a performance boost
2301     */
2302    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2303        this.copyMessageOnSend = copyMessageOnSend;
2304    }
2305
2306    @Override
2307    public String toString() {
2308        return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2309    }
2310
2311    protected BlobTransferPolicy createBlobTransferPolicy() {
2312        return new BlobTransferPolicy();
2313    }
2314
2315    public int getProtocolVersion() {
2316        return protocolVersion.get();
2317    }
2318
2319    public int getProducerWindowSize() {
2320        return producerWindowSize;
2321    }
2322
2323    public void setProducerWindowSize(int producerWindowSize) {
2324        this.producerWindowSize = producerWindowSize;
2325    }
2326
2327    public void setAuditDepth(int auditDepth) {
2328        connectionAudit.setAuditDepth(auditDepth);
2329    }
2330
2331    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2332        connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2333    }
2334
2335    protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2336        connectionAudit.removeDispatcher(dispatcher);
2337    }
2338
2339    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2340        return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2341    }
2342
2343    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2344        connectionAudit.rollbackDuplicate(dispatcher, message);
2345    }
2346
2347    public IOException getFirstFailureError() {
2348        return firstFailureError;
2349    }
2350
2351    protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2352        if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) {
2353            LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get());
2354            signalInterruptionProcessingComplete();
2355        }
2356    }
2357
2358    protected void transportInterruptionProcessingComplete() {
2359        if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
2360            signalInterruptionProcessingComplete();
2361        }
2362    }
2363
2364    private void signalInterruptionProcessingComplete() {
2365            if (LOG.isDebugEnabled()) {
2366                LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
2367                        + " for:" + this.getConnectionInfo().getConnectionId());
2368            }
2369
2370            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2371            if (failoverTransport != null) {
2372                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2373                if (LOG.isDebugEnabled()) {
2374                    LOG.debug("notified failover transport (" + failoverTransport
2375                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2376                }
2377            }
2378            transportInterruptionProcessingComplete.set(0);
2379    }
2380
2381    private void signalInterruptionProcessingNeeded() {
2382        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2383        if (failoverTransport != null) {
2384            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2385            if (LOG.isDebugEnabled()) {
2386                LOG.debug("notified failover transport (" + failoverTransport
2387                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2388            }
2389        }
2390    }
2391
2392    /*
2393     * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2394     * will wait to receive re dispatched messages.
2395     * default value is 0 so there is no wait by default.
2396     */
2397    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2398        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2399    }
2400
2401    public long getConsumerFailoverRedeliveryWaitPeriod() {
2402        return consumerFailoverRedeliveryWaitPeriod;
2403    }
2404
2405    protected Scheduler getScheduler() throws JMSException {
2406        Scheduler result = scheduler;
2407        if (result == null) {
2408            if (isClosing() || isClosed()) {
2409                // without lock contention report the closing state
2410                throw new ConnectionClosedException();
2411            }
2412            synchronized (this) {
2413                result = scheduler;
2414                if (result == null) {
2415                    checkClosed();
2416                    try {
2417                        result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2418                        result.start();
2419                        scheduler = result;
2420                    } catch(Exception e) {
2421                        throw JMSExceptionSupport.create(e);
2422                    }
2423                }
2424            }
2425        }
2426        return result;
2427    }
2428
2429    protected ThreadPoolExecutor getExecutor() {
2430        return this.executor;
2431    }
2432
2433    protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
2434        return sessions;
2435    }
2436
2437    /**
2438     * @return the checkForDuplicates
2439     */
2440    public boolean isCheckForDuplicates() {
2441        return this.checkForDuplicates;
2442    }
2443
2444    /**
2445     * @param checkForDuplicates the checkForDuplicates to set
2446     */
2447    public void setCheckForDuplicates(boolean checkForDuplicates) {
2448        this.checkForDuplicates = checkForDuplicates;
2449    }
2450
2451    public boolean isTransactedIndividualAck() {
2452        return transactedIndividualAck;
2453    }
2454
2455    public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2456        this.transactedIndividualAck = transactedIndividualAck;
2457    }
2458
2459    public boolean isNonBlockingRedelivery() {
2460        return nonBlockingRedelivery;
2461    }
2462
2463    public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2464        this.nonBlockingRedelivery = nonBlockingRedelivery;
2465    }
2466
2467    public boolean isRmIdFromConnectionId() {
2468        return rmIdFromConnectionId;
2469    }
2470
2471    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
2472        this.rmIdFromConnectionId = rmIdFromConnectionId;
2473    }
2474
2475    /**
2476     * Removes any TempDestinations that this connection has cached, ignoring
2477     * any exceptions generated because the destination is in use as they should
2478     * not be removed.
2479     * Used from a pooled connection, b/c it will not be explicitly closed.
2480     */
2481    public void cleanUpTempDestinations() {
2482
2483        if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2484            return;
2485        }
2486
2487        Iterator<ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2488            = this.activeTempDestinations.entrySet().iterator();
2489        while(entries.hasNext()) {
2490            ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2491            try {
2492                // Only delete this temp destination if it was created from this connection. The connection used
2493                // for the advisory consumer may also have a reference to this temp destination.
2494                ActiveMQTempDestination dest = entry.getValue();
2495                String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2496                if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2497                    this.deleteTempDestination(entry.getValue());
2498                }
2499            } catch (Exception ex) {
2500                // the temp dest is in use so it can not be deleted.
2501                // it is ok to leave it to connection tear down phase
2502            }
2503        }
2504    }
2505
2506    /**
2507     * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2508     * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2509     */
2510    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2511        this.redeliveryPolicyMap = redeliveryPolicyMap;
2512    }
2513
2514    /**
2515     * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2516     * Consumers when dealing with transaction messages that have been rolled back.
2517     *
2518     * @return the redeliveryPolicyMap
2519     */
2520    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2521        return redeliveryPolicyMap;
2522    }
2523
2524    public int getMaxThreadPoolSize() {
2525        return maxThreadPoolSize;
2526    }
2527
2528    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2529        this.maxThreadPoolSize = maxThreadPoolSize;
2530    }
2531
2532    /**
2533     * Enable enforcement of QueueConnection semantics.
2534     *
2535     * @return this object, useful for chaining
2536     */
2537    ActiveMQConnection enforceQueueOnlyConnection() {
2538        this.queueOnlyConnection = true;
2539        return this;
2540    }
2541
2542    public RejectedExecutionHandler getRejectedTaskHandler() {
2543        return rejectedTaskHandler;
2544    }
2545
2546    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2547        this.rejectedTaskHandler = rejectedTaskHandler;
2548    }
2549
2550    /**
2551     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2552     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
2553     * will not do any background Message acknowledgment.
2554     *
2555     * @return the scheduledOptimizedAckInterval
2556     */
2557    public long getOptimizedAckScheduledAckInterval() {
2558        return optimizedAckScheduledAckInterval;
2559    }
2560
2561    /**
2562     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2563     * have been configured with optimizeAcknowledge enabled.
2564     *
2565     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
2566     */
2567    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2568        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2569    }
2570
2571    /**
2572     * @return true if MessageConsumer instance will check for expired messages before dispatch.
2573     */
2574    public boolean isConsumerExpiryCheckEnabled() {
2575        return consumerExpiryCheckEnabled;
2576    }
2577
2578    /**
2579     * Controls whether message expiration checking is done in each MessageConsumer
2580     * prior to dispatching a message.  Disabling this check can lead to consumption
2581     * of expired messages.
2582     *
2583     * @param consumerExpiryCheckEnabled
2584     *        controls whether expiration checking is done prior to dispatch.
2585     */
2586    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
2587        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
2588    }
2589
2590    public List<String> getTrustedPackages() {
2591        return trustedPackages;
2592    }
2593
2594    public void setTrustedPackages(List<String> trustedPackages) {
2595        this.trustedPackages = trustedPackages;
2596    }
2597
2598    public boolean isTrustAllPackages() {
2599        return trustAllPackages;
2600    }
2601
2602    public void setTrustAllPackages(boolean trustAllPackages) {
2603        this.trustAllPackages = trustAllPackages;
2604    }
2605}