001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.LinkedBlockingQueue;
031    import java.util.concurrent.RejectedExecutionHandler;
032    import java.util.concurrent.ThreadFactory;
033    import java.util.concurrent.ThreadPoolExecutor;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.atomic.AtomicBoolean;
036    import java.util.concurrent.atomic.AtomicInteger;
037    
038    import javax.jms.Connection;
039    import javax.jms.ConnectionConsumer;
040    import javax.jms.ConnectionMetaData;
041    import javax.jms.DeliveryMode;
042    import javax.jms.Destination;
043    import javax.jms.ExceptionListener;
044    import javax.jms.IllegalStateException;
045    import javax.jms.InvalidDestinationException;
046    import javax.jms.JMSException;
047    import javax.jms.Queue;
048    import javax.jms.QueueConnection;
049    import javax.jms.QueueSession;
050    import javax.jms.ServerSessionPool;
051    import javax.jms.Session;
052    import javax.jms.Topic;
053    import javax.jms.TopicConnection;
054    import javax.jms.TopicSession;
055    import javax.jms.XAConnection;
056    
057    import org.apache.activemq.advisory.DestinationSource;
058    import org.apache.activemq.blob.BlobTransferPolicy;
059    import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
060    import org.apache.activemq.command.ActiveMQDestination;
061    import org.apache.activemq.command.ActiveMQMessage;
062    import org.apache.activemq.command.ActiveMQTempDestination;
063    import org.apache.activemq.command.ActiveMQTempQueue;
064    import org.apache.activemq.command.ActiveMQTempTopic;
065    import org.apache.activemq.command.BrokerInfo;
066    import org.apache.activemq.command.Command;
067    import org.apache.activemq.command.CommandTypes;
068    import org.apache.activemq.command.ConnectionControl;
069    import org.apache.activemq.command.ConnectionError;
070    import org.apache.activemq.command.ConnectionId;
071    import org.apache.activemq.command.ConnectionInfo;
072    import org.apache.activemq.command.ConsumerControl;
073    import org.apache.activemq.command.ConsumerId;
074    import org.apache.activemq.command.ConsumerInfo;
075    import org.apache.activemq.command.ControlCommand;
076    import org.apache.activemq.command.DestinationInfo;
077    import org.apache.activemq.command.ExceptionResponse;
078    import org.apache.activemq.command.Message;
079    import org.apache.activemq.command.MessageDispatch;
080    import org.apache.activemq.command.MessageId;
081    import org.apache.activemq.command.ProducerAck;
082    import org.apache.activemq.command.ProducerId;
083    import org.apache.activemq.command.RemoveInfo;
084    import org.apache.activemq.command.RemoveSubscriptionInfo;
085    import org.apache.activemq.command.Response;
086    import org.apache.activemq.command.SessionId;
087    import org.apache.activemq.command.ShutdownInfo;
088    import org.apache.activemq.command.WireFormatInfo;
089    import org.apache.activemq.management.JMSConnectionStatsImpl;
090    import org.apache.activemq.management.JMSStatsImpl;
091    import org.apache.activemq.management.StatsCapable;
092    import org.apache.activemq.management.StatsImpl;
093    import org.apache.activemq.state.CommandVisitorAdapter;
094    import org.apache.activemq.thread.Scheduler;
095    import org.apache.activemq.thread.TaskRunnerFactory;
096    import org.apache.activemq.transport.FutureResponse;
097    import org.apache.activemq.transport.ResponseCallback;
098    import org.apache.activemq.transport.Transport;
099    import org.apache.activemq.transport.TransportListener;
100    import org.apache.activemq.transport.failover.FailoverTransport;
101    import org.apache.activemq.util.IdGenerator;
102    import org.apache.activemq.util.IntrospectionSupport;
103    import org.apache.activemq.util.JMSExceptionSupport;
104    import org.apache.activemq.util.LongSequenceGenerator;
105    import org.apache.activemq.util.ServiceSupport;
106    import org.apache.activemq.util.ThreadPoolUtils;
107    import org.slf4j.Logger;
108    import org.slf4j.LoggerFactory;
109    
110    public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
111    
112        public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
113        public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
114        public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
115        public static int DEFAULT_THREAD_POOL_SIZE = 1000;
116    
117        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
118    
119        public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
120    
121        protected boolean dispatchAsync=true;
122        protected boolean alwaysSessionAsync = true;
123    
124        private TaskRunnerFactory sessionTaskRunner;
125        private final ThreadPoolExecutor executor;
126    
127        // Connection state variables
128        private final ConnectionInfo info;
129        private ExceptionListener exceptionListener;
130        private ClientInternalExceptionListener clientInternalExceptionListener;
131        private boolean clientIDSet;
132        private boolean isConnectionInfoSentToBroker;
133        private boolean userSpecifiedClientID;
134    
135        // Configuration options variables
136        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
137        private BlobTransferPolicy blobTransferPolicy;
138        private RedeliveryPolicyMap redeliveryPolicyMap;
139        private MessageTransformer transformer;
140    
141        private boolean disableTimeStampsByDefault;
142        private boolean optimizedMessageDispatch = true;
143        private boolean copyMessageOnSend = true;
144        private boolean useCompression;
145        private boolean objectMessageSerializationDefered;
146        private boolean useAsyncSend;
147        private boolean optimizeAcknowledge;
148        private long optimizeAcknowledgeTimeOut = 0;
149        private long optimizedAckScheduledAckInterval = 0;
150        private boolean nestedMapAndListEnabled = true;
151        private boolean useRetroactiveConsumer;
152        private boolean exclusiveConsumer;
153        private boolean alwaysSyncSend;
154        private int closeTimeout = 15000;
155        private boolean watchTopicAdvisories = true;
156        private long warnAboutUnstartedConnectionTimeout = 500L;
157        private int sendTimeout =0;
158        private boolean sendAcksAsync=true;
159        private boolean checkForDuplicates = true;
160        private boolean queueOnlyConnection = false;
161    
162        private final Transport transport;
163        private final IdGenerator clientIdGenerator;
164        private final JMSStatsImpl factoryStats;
165        private final JMSConnectionStatsImpl stats;
166    
167        private final AtomicBoolean started = new AtomicBoolean(false);
168        private final AtomicBoolean closing = new AtomicBoolean(false);
169        private final AtomicBoolean closed = new AtomicBoolean(false);
170        private final AtomicBoolean transportFailed = new AtomicBoolean(false);
171        private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
172        private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
173        private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
174        private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
175        private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
176    
177        // Maps ConsumerIds to ActiveMQConsumer objects
178        private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
179        private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
180        private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
181        private final SessionId connectionSessionId;
182        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
183        private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
184        private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
185        private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
186    
187        private AdvisoryConsumer advisoryConsumer;
188        private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
189        private BrokerInfo brokerInfo;
190        private IOException firstFailureError;
191        private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
192    
193        // Assume that protocol is the latest. Change to the actual protocol
194        // version when a WireFormatInfo is received.
195        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
196        private final long timeCreated;
197        private final ConnectionAudit connectionAudit = new ConnectionAudit();
198        private DestinationSource destinationSource;
199        private final Object ensureConnectionInfoSentMutex = new Object();
200        private boolean useDedicatedTaskRunner;
201        protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
202        private long consumerFailoverRedeliveryWaitPeriod;
203        private Scheduler scheduler;
204        private boolean messagePrioritySupported = true;
205        private boolean transactedIndividualAck = false;
206        private boolean nonBlockingRedelivery = false;
207    
208        private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
209        private RejectedExecutionHandler rejectedTaskHandler = null;
210    
211        /**
212         * Construct an <code>ActiveMQConnection</code>
213         *
214         * @param transport
215         * @param factoryStats
216         * @throws Exception
217         */
218        protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
219    
220            this.transport = transport;
221            this.clientIdGenerator = clientIdGenerator;
222            this.factoryStats = factoryStats;
223    
224            // Configure a single threaded executor who's core thread can timeout if
225            // idle
226            executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
227                @Override
228                public Thread newThread(Runnable r) {
229                    Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
230                    //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
231                    //thread.setDaemon(true);
232                    return thread;
233                }
234            });
235            // asyncConnectionThread.allowCoreThreadTimeOut(true);
236            String uniqueId = connectionIdGenerator.generateId();
237            this.info = new ConnectionInfo(new ConnectionId(uniqueId));
238            this.info.setManageable(true);
239            this.info.setFaultTolerant(transport.isFaultTolerant());
240            this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
241    
242            this.transport.setTransportListener(this);
243    
244            this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
245            this.factoryStats.addConnection(this);
246            this.timeCreated = System.currentTimeMillis();
247            this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
248        }
249    
250        protected void setUserName(String userName) {
251            this.info.setUserName(userName);
252        }
253    
254        protected void setPassword(String password) {
255            this.info.setPassword(password);
256        }
257    
258        /**
259         * A static helper method to create a new connection
260         *
261         * @return an ActiveMQConnection
262         * @throws JMSException
263         */
264        public static ActiveMQConnection makeConnection() throws JMSException {
265            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
266            return (ActiveMQConnection)factory.createConnection();
267        }
268    
269        /**
270         * A static helper method to create a new connection
271         *
272         * @param uri
273         * @return and ActiveMQConnection
274         * @throws JMSException
275         */
276        public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
277            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
278            return (ActiveMQConnection)factory.createConnection();
279        }
280    
281        /**
282         * A static helper method to create a new connection
283         *
284         * @param user
285         * @param password
286         * @param uri
287         * @return an ActiveMQConnection
288         * @throws JMSException
289         */
290        public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
291            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
292            return (ActiveMQConnection)factory.createConnection();
293        }
294    
295        /**
296         * @return a number unique for this connection
297         */
298        public JMSConnectionStatsImpl getConnectionStats() {
299            return stats;
300        }
301    
302        /**
303         * Creates a <CODE>Session</CODE> object.
304         *
305         * @param transacted indicates whether the session is transacted
306         * @param acknowledgeMode indicates whether the consumer or the client will
307         *                acknowledge any messages it receives; ignored if the
308         *                session is transacted. Legal values are
309         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
310         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
311         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
312         * @return a newly created session
313         * @throws JMSException if the <CODE>Connection</CODE> object fails to
314         *                 create a session due to some internal error or lack of
315         *                 support for the specific transaction and acknowledgement
316         *                 mode.
317         * @see Session#AUTO_ACKNOWLEDGE
318         * @see Session#CLIENT_ACKNOWLEDGE
319         * @see Session#DUPS_OK_ACKNOWLEDGE
320         * @since 1.1
321         */
322        @Override
323        public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
324            checkClosedOrFailed();
325            ensureConnectionInfoSent();
326            if(!transacted) {
327                if (acknowledgeMode==Session.SESSION_TRANSACTED) {
328                    throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
329                } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
330                    throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
331                            "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
332                }
333            }
334            return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
335                ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
336        }
337    
338        /**
339         * @return sessionId
340         */
341        protected SessionId getNextSessionId() {
342            return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
343        }
344    
345        /**
346         * Gets the client identifier for this connection.
347         * <P>
348         * This value is specific to the JMS provider. It is either preconfigured by
349         * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
350         * dynamically by the application by calling the <code>setClientID</code>
351         * method.
352         *
353         * @return the unique client identifier
354         * @throws JMSException if the JMS provider fails to return the client ID
355         *                 for this connection due to some internal error.
356         */
357        @Override
358        public String getClientID() throws JMSException {
359            checkClosedOrFailed();
360            return this.info.getClientId();
361        }
362    
363        /**
364         * Sets the client identifier for this connection.
365         * <P>
366         * The preferred way to assign a JMS client's client identifier is for it to
367         * be configured in a client-specific <CODE>ConnectionFactory</CODE>
368         * object and transparently assigned to the <CODE>Connection</CODE> object
369         * it creates.
370         * <P>
371         * Alternatively, a client can set a connection's client identifier using a
372         * provider-specific value. The facility to set a connection's client
373         * identifier explicitly is not a mechanism for overriding the identifier
374         * that has been administratively configured. It is provided for the case
375         * where no administratively specified identifier exists. If one does exist,
376         * an attempt to change it by setting it must throw an
377         * <CODE>IllegalStateException</CODE>. If a client sets the client
378         * identifier explicitly, it must do so immediately after it creates the
379         * connection and before any other action on the connection is taken. After
380         * this point, setting the client identifier is a programming error that
381         * should throw an <CODE>IllegalStateException</CODE>.
382         * <P>
383         * The purpose of the client identifier is to associate a connection and its
384         * objects with a state maintained on behalf of the client by a provider.
385         * The only such state identified by the JMS API is that required to support
386         * durable subscriptions.
387         * <P>
388         * If another connection with the same <code>clientID</code> is already
389         * running when this method is called, the JMS provider should detect the
390         * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
391         *
392         * @param newClientID the unique client identifier
393         * @throws JMSException if the JMS provider fails to set the client ID for
394         *                 this connection due to some internal error.
395         * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
396         *                 invalid or duplicate client ID.
397         * @throws javax.jms.IllegalStateException if the JMS client attempts to set
398         *                 a connection's client ID at the wrong time or when it has
399         *                 been administratively configured.
400         */
401        @Override
402        public void setClientID(String newClientID) throws JMSException {
403            checkClosedOrFailed();
404    
405            if (this.clientIDSet) {
406                throw new IllegalStateException("The clientID has already been set");
407            }
408    
409            if (this.isConnectionInfoSentToBroker) {
410                throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
411            }
412    
413            this.info.setClientId(newClientID);
414            this.userSpecifiedClientID = true;
415            ensureConnectionInfoSent();
416        }
417    
418        /**
419         * Sets the default client id that the connection will use if explicitly not
420         * set with the setClientId() call.
421         */
422        public void setDefaultClientID(String clientID) throws JMSException {
423            this.info.setClientId(clientID);
424            this.userSpecifiedClientID = true;
425        }
426    
427        /**
428         * Gets the metadata for this connection.
429         *
430         * @return the connection metadata
431         * @throws JMSException if the JMS provider fails to get the connection
432         *                 metadata for this connection.
433         * @see javax.jms.ConnectionMetaData
434         */
435        @Override
436        public ConnectionMetaData getMetaData() throws JMSException {
437            checkClosedOrFailed();
438            return ActiveMQConnectionMetaData.INSTANCE;
439        }
440    
441        /**
442         * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
443         * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
444         * associated with it.
445         *
446         * @return the <CODE>ExceptionListener</CODE> for this connection, or
447         *         null, if no <CODE>ExceptionListener</CODE> is associated with
448         *         this connection.
449         * @throws JMSException if the JMS provider fails to get the
450         *                 <CODE>ExceptionListener</CODE> for this connection.
451         * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
452         */
453        @Override
454        public ExceptionListener getExceptionListener() throws JMSException {
455            checkClosedOrFailed();
456            return this.exceptionListener;
457        }
458    
459        /**
460         * Sets an exception listener for this connection.
461         * <P>
462         * If a JMS provider detects a serious problem with a connection, it informs
463         * the connection's <CODE> ExceptionListener</CODE>, if one has been
464         * registered. It does this by calling the listener's <CODE>onException
465         * </CODE>
466         * method, passing it a <CODE>JMSException</CODE> object describing the
467         * problem.
468         * <P>
469         * An exception listener allows a client to be notified of a problem
470         * asynchronously. Some connections only consume messages, so they would
471         * have no other way to learn their connection has failed.
472         * <P>
473         * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
474         * <P>
475         * A JMS provider should attempt to resolve connection problems itself
476         * before it notifies the client of them.
477         *
478         * @param listener the exception listener
479         * @throws JMSException if the JMS provider fails to set the exception
480         *                 listener for this connection.
481         */
482        @Override
483        public void setExceptionListener(ExceptionListener listener) throws JMSException {
484            checkClosedOrFailed();
485            this.exceptionListener = listener;
486        }
487    
488        /**
489         * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
490         * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
491         * associated with it.
492         *
493         * @return the listener or <code>null</code> if no listener is registered with the connection.
494         */
495        public ClientInternalExceptionListener getClientInternalExceptionListener() {
496            return clientInternalExceptionListener;
497        }
498    
499        /**
500         * Sets a client internal exception listener for this connection.
501         * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
502         * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
503         * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
504         * describing the problem.
505         *
506         * @param listener the exception listener
507         */
508        public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
509            this.clientInternalExceptionListener = listener;
510        }
511    
512        /**
513         * Starts (or restarts) a connection's delivery of incoming messages. A call
514         * to <CODE>start</CODE> on a connection that has already been started is
515         * ignored.
516         *
517         * @throws JMSException if the JMS provider fails to start message delivery
518         *                 due to some internal error.
519         * @see javax.jms.Connection#stop()
520         */
521        @Override
522        public void start() throws JMSException {
523            checkClosedOrFailed();
524            ensureConnectionInfoSent();
525            if (started.compareAndSet(false, true)) {
526                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
527                    ActiveMQSession session = i.next();
528                    session.start();
529                }
530            }
531        }
532    
533        /**
534         * Temporarily stops a connection's delivery of incoming messages. Delivery
535         * can be restarted using the connection's <CODE>start</CODE> method. When
536         * the connection is stopped, delivery to all the connection's message
537         * consumers is inhibited: synchronous receives block, and messages are not
538         * delivered to message listeners.
539         * <P>
540         * This call blocks until receives and/or message listeners in progress have
541         * completed.
542         * <P>
543         * Stopping a connection has no effect on its ability to send messages. A
544         * call to <CODE>stop</CODE> on a connection that has already been stopped
545         * is ignored.
546         * <P>
547         * A call to <CODE>stop</CODE> must not return until delivery of messages
548         * has paused. This means that a client can rely on the fact that none of
549         * its message listeners will be called and that all threads of control
550         * waiting for <CODE>receive</CODE> calls to return will not return with a
551         * message until the connection is restarted. The receive timers for a
552         * stopped connection continue to advance, so receives may time out while
553         * the connection is stopped.
554         * <P>
555         * If message listeners are running when <CODE>stop</CODE> is invoked, the
556         * <CODE>stop</CODE> call must wait until all of them have returned before
557         * it may return. While these message listeners are completing, they must
558         * have the full services of the connection available to them.
559         *
560         * @throws JMSException if the JMS provider fails to stop message delivery
561         *                 due to some internal error.
562         * @see javax.jms.Connection#start()
563         */
564        @Override
565        public void stop() throws JMSException {
566            doStop(true);
567        }
568    
569        /**
570         * @see #stop()
571         * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed,
572         *                    <tt>false</tt> to skip this check
573         * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
574         */
575        void doStop(boolean checkClosed) throws JMSException {
576            if (checkClosed) {
577                checkClosedOrFailed();
578            }
579            if (started.compareAndSet(true, false)) {
580                synchronized(sessions) {
581                    for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
582                        ActiveMQSession s = i.next();
583                        s.stop();
584                    }
585                }
586            }
587        }
588    
589        /**
590         * Closes the connection.
591         * <P>
592         * Since a provider typically allocates significant resources outside the
593         * JVM on behalf of a connection, clients should close these resources when
594         * they are not needed. Relying on garbage collection to eventually reclaim
595         * these resources may not be timely enough.
596         * <P>
597         * There is no need to close the sessions, producers, and consumers of a
598         * closed connection.
599         * <P>
600         * Closing a connection causes all temporary destinations to be deleted.
601         * <P>
602         * When this method is invoked, it should not return until message
603         * processing has been shut down in an orderly fashion. This means that all
604         * message listeners that may have been running have returned, and that all
605         * pending receives have returned. A close terminates all pending message
606         * receives on the connection's sessions' consumers. The receives may return
607         * with a message or with null, depending on whether there was a message
608         * available at the time of the close. If one or more of the connection's
609         * sessions' message listeners is processing a message at the time when
610         * connection <CODE>close</CODE> is invoked, all the facilities of the
611         * connection and its sessions must remain available to those listeners
612         * until they return control to the JMS provider.
613         * <P>
614         * Closing a connection causes any of its sessions' transactions in progress
615         * to be rolled back. In the case where a session's work is coordinated by
616         * an external transaction manager, a session's <CODE>commit</CODE> and
617         * <CODE> rollback</CODE> methods are not used and the result of a closed
618         * session's work is determined later by the transaction manager. Closing a
619         * connection does NOT force an acknowledgment of client-acknowledged
620         * sessions.
621         * <P>
622         * Invoking the <CODE>acknowledge</CODE> method of a received message from
623         * a closed connection's session must throw an
624         * <CODE>IllegalStateException</CODE>. Closing a closed connection must
625         * NOT throw an exception.
626         *
627         * @throws JMSException if the JMS provider fails to close the connection
628         *                 due to some internal error. For example, a failure to
629         *                 release resources or to close a socket connection can
630         *                 cause this exception to be thrown.
631         */
632        @Override
633        public void close() throws JMSException {
634            // Store the interrupted state and clear so that cleanup happens without
635            // leaking connection resources.  Reset in finally to preserve state.
636            boolean interrupted = Thread.interrupted();
637    
638            try {
639    
640                // If we were running, lets stop first.
641                if (!closed.get() && !transportFailed.get()) {
642                    // do not fail if already closed as according to JMS spec we must not
643                    // throw exception if already closed
644                    doStop(false);
645                }
646    
647                synchronized (this) {
648                    if (!closed.get()) {
649                        closing.set(true);
650    
651                        if (destinationSource != null) {
652                            destinationSource.stop();
653                            destinationSource = null;
654                        }
655                        if (advisoryConsumer != null) {
656                            advisoryConsumer.dispose();
657                            advisoryConsumer = null;
658                        }
659    
660                        Scheduler scheduler = this.scheduler;
661                        if (scheduler != null) {
662                            try {
663                                scheduler.stop();
664                            } catch (Exception e) {
665                                JMSException ex =  JMSExceptionSupport.create(e);
666                                throw ex;
667                            }
668                        }
669    
670                        long lastDeliveredSequenceId = 0;
671                        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
672                            ActiveMQSession s = i.next();
673                            s.dispose();
674                            lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
675                        }
676                        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
677                            ActiveMQConnectionConsumer c = i.next();
678                            c.dispose();
679                        }
680                        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
681                            ActiveMQInputStream c = i.next();
682                            c.dispose();
683                        }
684                        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
685                            ActiveMQOutputStream c = i.next();
686                            c.dispose();
687                        }
688    
689                        this.activeTempDestinations.clear();
690    
691                        if (isConnectionInfoSentToBroker) {
692                            // If we announced ourselves to the broker.. Try to let the broker
693                            // know that the connection is being shutdown.
694                            RemoveInfo removeCommand = info.createRemoveCommand();
695                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
696                            doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
697                            doAsyncSendPacket(new ShutdownInfo());
698                        }
699    
700                        started.set(false);
701    
702                        // TODO if we move the TaskRunnerFactory to the connection
703                        // factory
704                        // then we may need to call
705                        // factory.onConnectionClose(this);
706                        if (sessionTaskRunner != null) {
707                            sessionTaskRunner.shutdown();
708                        }
709                        closed.set(true);
710                        closing.set(false);
711                    }
712                }
713            } finally {
714                try {
715                    if (executor != null) {
716                        ThreadPoolUtils.shutdown(executor);
717                    }
718                } catch (Throwable e) {
719                    LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
720                }
721    
722                ServiceSupport.dispose(this.transport);
723    
724                factoryStats.removeConnection(this);
725                if (interrupted) {
726                    Thread.currentThread().interrupt();
727                }
728            }
729        }
730    
731        /**
732         * Tells the broker to terminate its VM. This can be used to cleanly
733         * terminate a broker running in a standalone java process. Server must have
734         * property enable.vm.shutdown=true defined to allow this to work.
735         */
736        // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
737        // implemented.
738        /*
739         * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
740         * command = new BrokerAdminCommand();
741         * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
742         * asyncSendPacket(command); }
743         */
744    
745        /**
746         * Create a durable connection consumer for this connection (optional
747         * operation). This is an expert facility not used by regular JMS clients.
748         *
749         * @param topic topic to access
750         * @param subscriptionName durable subscription name
751         * @param messageSelector only messages with properties matching the message
752         *                selector expression are delivered. A value of null or an
753         *                empty string indicates that there is no message selector
754         *                for the message consumer.
755         * @param sessionPool the server session pool to associate with this durable
756         *                connection consumer
757         * @param maxMessages the maximum number of messages that can be assigned to
758         *                a server session at one time
759         * @return the durable connection consumer
760         * @throws JMSException if the <CODE>Connection</CODE> object fails to
761         *                 create a connection consumer due to some internal error
762         *                 or invalid arguments for <CODE>sessionPool</CODE> and
763         *                 <CODE>messageSelector</CODE>.
764         * @throws javax.jms.InvalidDestinationException if an invalid destination
765         *                 is specified.
766         * @throws javax.jms.InvalidSelectorException if the message selector is
767         *                 invalid.
768         * @see javax.jms.ConnectionConsumer
769         * @since 1.1
770         */
771        @Override
772        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
773            throws JMSException {
774            return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
775        }
776    
777        /**
778         * Create a durable connection consumer for this connection (optional
779         * operation). This is an expert facility not used by regular JMS clients.
780         *
781         * @param topic topic to access
782         * @param subscriptionName durable subscription name
783         * @param messageSelector only messages with properties matching the message
784         *                selector expression are delivered. A value of null or an
785         *                empty string indicates that there is no message selector
786         *                for the message consumer.
787         * @param sessionPool the server session pool to associate with this durable
788         *                connection consumer
789         * @param maxMessages the maximum number of messages that can be assigned to
790         *                a server session at one time
791         * @param noLocal set true if you want to filter out messages published
792         *                locally
793         * @return the durable connection consumer
794         * @throws JMSException if the <CODE>Connection</CODE> object fails to
795         *                 create a connection consumer due to some internal error
796         *                 or invalid arguments for <CODE>sessionPool</CODE> and
797         *                 <CODE>messageSelector</CODE>.
798         * @throws javax.jms.InvalidDestinationException if an invalid destination
799         *                 is specified.
800         * @throws javax.jms.InvalidSelectorException if the message selector is
801         *                 invalid.
802         * @see javax.jms.ConnectionConsumer
803         * @since 1.1
804         */
805        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
806                                                                  boolean noLocal) throws JMSException {
807            checkClosedOrFailed();
808    
809            if (queueOnlyConnection) {
810                throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
811            }
812    
813            ensureConnectionInfoSent();
814            SessionId sessionId = new SessionId(info.getConnectionId(), -1);
815            ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
816            info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
817            info.setSubscriptionName(subscriptionName);
818            info.setSelector(messageSelector);
819            info.setPrefetchSize(maxMessages);
820            info.setDispatchAsync(isDispatchAsync());
821    
822            // Allows the options on the destination to configure the consumerInfo
823            if (info.getDestination().getOptions() != null) {
824                Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
825                IntrospectionSupport.setProperties(this.info, options, "consumer.");
826            }
827    
828            return new ActiveMQConnectionConsumer(this, sessionPool, info);
829        }
830    
831        // Properties
832        // -------------------------------------------------------------------------
833    
834        /**
835         * Returns true if this connection has been started
836         *
837         * @return true if this Connection is started
838         */
839        public boolean isStarted() {
840            return started.get();
841        }
842    
843        /**
844         * Returns true if the connection is closed
845         */
846        public boolean isClosed() {
847            return closed.get();
848        }
849    
850        /**
851         * Returns true if the connection is in the process of being closed
852         */
853        public boolean isClosing() {
854            return closing.get();
855        }
856    
857        /**
858         * Returns true if the underlying transport has failed
859         */
860        public boolean isTransportFailed() {
861            return transportFailed.get();
862        }
863    
864        /**
865         * @return Returns the prefetchPolicy.
866         */
867        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
868            return prefetchPolicy;
869        }
870    
871        /**
872         * Sets the <a
873         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
874         * policy</a> for consumers created by this connection.
875         */
876        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
877            this.prefetchPolicy = prefetchPolicy;
878        }
879    
880        /**
881         */
882        public Transport getTransportChannel() {
883            return transport;
884        }
885    
886        /**
887         * @return Returns the clientID of the connection, forcing one to be
888         *         generated if one has not yet been configured.
889         */
890        public String getInitializedClientID() throws JMSException {
891            ensureConnectionInfoSent();
892            return info.getClientId();
893        }
894    
895        /**
896         * @return Returns the timeStampsDisableByDefault.
897         */
898        public boolean isDisableTimeStampsByDefault() {
899            return disableTimeStampsByDefault;
900        }
901    
902        /**
903         * Sets whether or not timestamps on messages should be disabled or not. If
904         * you disable them it adds a small performance boost.
905         */
906        public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
907            this.disableTimeStampsByDefault = timeStampsDisableByDefault;
908        }
909    
910        /**
911         * @return Returns the dispatchOptimizedMessage.
912         */
913        public boolean isOptimizedMessageDispatch() {
914            return optimizedMessageDispatch;
915        }
916    
917        /**
918         * If this flag is set then an larger prefetch limit is used - only
919         * applicable for durable topic subscribers.
920         */
921        public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
922            this.optimizedMessageDispatch = dispatchOptimizedMessage;
923        }
924    
925        /**
926         * @return Returns the closeTimeout.
927         */
928        public int getCloseTimeout() {
929            return closeTimeout;
930        }
931    
932        /**
933         * Sets the timeout before a close is considered complete. Normally a
934         * close() on a connection waits for confirmation from the broker; this
935         * allows that operation to timeout to save the client hanging if there is
936         * no broker
937         */
938        public void setCloseTimeout(int closeTimeout) {
939            this.closeTimeout = closeTimeout;
940        }
941    
942        /**
943         * @return ConnectionInfo
944         */
945        public ConnectionInfo getConnectionInfo() {
946            return this.info;
947        }
948    
949        public boolean isUseRetroactiveConsumer() {
950            return useRetroactiveConsumer;
951        }
952    
953        /**
954         * Sets whether or not retroactive consumers are enabled. Retroactive
955         * consumers allow non-durable topic subscribers to receive old messages
956         * that were published before the non-durable subscriber started.
957         */
958        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
959            this.useRetroactiveConsumer = useRetroactiveConsumer;
960        }
961    
962        public boolean isNestedMapAndListEnabled() {
963            return nestedMapAndListEnabled;
964        }
965    
966        /**
967         * Enables/disables whether or not Message properties and MapMessage entries
968         * support <a
969         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
970         * Structures</a> of Map and List objects
971         */
972        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
973            this.nestedMapAndListEnabled = structuredMapsEnabled;
974        }
975    
976        public boolean isExclusiveConsumer() {
977            return exclusiveConsumer;
978        }
979    
980        /**
981         * Enables or disables whether or not queue consumers should be exclusive or
982         * not for example to preserve ordering when not using <a
983         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
984         *
985         * @param exclusiveConsumer
986         */
987        public void setExclusiveConsumer(boolean exclusiveConsumer) {
988            this.exclusiveConsumer = exclusiveConsumer;
989        }
990    
991        /**
992         * Adds a transport listener so that a client can be notified of events in
993         * the underlying transport
994         */
995        public void addTransportListener(TransportListener transportListener) {
996            transportListeners.add(transportListener);
997        }
998    
999        public void removeTransportListener(TransportListener transportListener) {
1000            transportListeners.remove(transportListener);
1001        }
1002    
1003        public boolean isUseDedicatedTaskRunner() {
1004            return useDedicatedTaskRunner;
1005        }
1006    
1007        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1008            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1009        }
1010    
1011        public TaskRunnerFactory getSessionTaskRunner() {
1012            synchronized (this) {
1013                if (sessionTaskRunner == null) {
1014                    sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
1015                    sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
1016                }
1017            }
1018            return sessionTaskRunner;
1019        }
1020    
1021        public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1022            this.sessionTaskRunner = sessionTaskRunner;
1023        }
1024    
1025        public MessageTransformer getTransformer() {
1026            return transformer;
1027        }
1028    
1029        /**
1030         * Sets the transformer used to transform messages before they are sent on
1031         * to the JMS bus or when they are received from the bus but before they are
1032         * delivered to the JMS client
1033         */
1034        public void setTransformer(MessageTransformer transformer) {
1035            this.transformer = transformer;
1036        }
1037    
1038        /**
1039         * @return the statsEnabled
1040         */
1041        public boolean isStatsEnabled() {
1042            return this.stats.isEnabled();
1043        }
1044    
1045        /**
1046         * @param statsEnabled the statsEnabled to set
1047         */
1048        public void setStatsEnabled(boolean statsEnabled) {
1049            this.stats.setEnabled(statsEnabled);
1050        }
1051    
1052        /**
1053         * Returns the {@link DestinationSource} object which can be used to listen to destinations
1054         * being created or destroyed or to enquire about the current destinations available on the broker
1055         *
1056         * @return a lazily created destination source
1057         * @throws JMSException
1058         */
1059        @Override
1060        public DestinationSource getDestinationSource() throws JMSException {
1061            if (destinationSource == null) {
1062                destinationSource = new DestinationSource(this);
1063                destinationSource.start();
1064            }
1065            return destinationSource;
1066        }
1067    
1068        // Implementation methods
1069        // -------------------------------------------------------------------------
1070    
1071        /**
1072         * Used internally for adding Sessions to the Connection
1073         *
1074         * @param session
1075         * @throws JMSException
1076         * @throws JMSException
1077         */
1078        protected void addSession(ActiveMQSession session) throws JMSException {
1079            this.sessions.add(session);
1080            if (sessions.size() > 1 || session.isTransacted()) {
1081                optimizedMessageDispatch = false;
1082            }
1083        }
1084    
1085        /**
1086         * Used interanlly for removing Sessions from a Connection
1087         *
1088         * @param session
1089         */
1090        protected void removeSession(ActiveMQSession session) {
1091            this.sessions.remove(session);
1092            this.removeDispatcher(session);
1093        }
1094    
1095        /**
1096         * Add a ConnectionConsumer
1097         *
1098         * @param connectionConsumer
1099         * @throws JMSException
1100         */
1101        protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1102            this.connectionConsumers.add(connectionConsumer);
1103        }
1104    
1105        /**
1106         * Remove a ConnectionConsumer
1107         *
1108         * @param connectionConsumer
1109         */
1110        protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1111            this.connectionConsumers.remove(connectionConsumer);
1112            this.removeDispatcher(connectionConsumer);
1113        }
1114    
1115        /**
1116         * Creates a <CODE>TopicSession</CODE> object.
1117         *
1118         * @param transacted indicates whether the session is transacted
1119         * @param acknowledgeMode indicates whether the consumer or the client will
1120         *                acknowledge any messages it receives; ignored if the
1121         *                session is transacted. Legal values are
1122         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1123         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1124         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1125         * @return a newly created topic session
1126         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1127         *                 to create a session due to some internal error or lack of
1128         *                 support for the specific transaction and acknowledgement
1129         *                 mode.
1130         * @see Session#AUTO_ACKNOWLEDGE
1131         * @see Session#CLIENT_ACKNOWLEDGE
1132         * @see Session#DUPS_OK_ACKNOWLEDGE
1133         */
1134        @Override
1135        public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1136            return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1137        }
1138    
1139        /**
1140         * Creates a connection consumer for this connection (optional operation).
1141         * This is an expert facility not used by regular JMS clients.
1142         *
1143         * @param topic the topic to access
1144         * @param messageSelector only messages with properties matching the message
1145         *                selector expression are delivered. A value of null or an
1146         *                empty string indicates that there is no message selector
1147         *                for the message consumer.
1148         * @param sessionPool the server session pool to associate with this
1149         *                connection consumer
1150         * @param maxMessages the maximum number of messages that can be assigned to
1151         *                a server session at one time
1152         * @return the connection consumer
1153         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1154         *                 to create a connection consumer due to some internal
1155         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1156         *                 and <CODE>messageSelector</CODE>.
1157         * @throws javax.jms.InvalidDestinationException if an invalid topic is
1158         *                 specified.
1159         * @throws javax.jms.InvalidSelectorException if the message selector is
1160         *                 invalid.
1161         * @see javax.jms.ConnectionConsumer
1162         */
1163        @Override
1164        public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1165            return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1166        }
1167    
1168        /**
1169         * Creates a connection consumer for this connection (optional operation).
1170         * This is an expert facility not used by regular JMS clients.
1171         *
1172         * @param queue the queue to access
1173         * @param messageSelector only messages with properties matching the message
1174         *                selector expression are delivered. A value of null or an
1175         *                empty string indicates that there is no message selector
1176         *                for the message consumer.
1177         * @param sessionPool the server session pool to associate with this
1178         *                connection consumer
1179         * @param maxMessages the maximum number of messages that can be assigned to
1180         *                a server session at one time
1181         * @return the connection consumer
1182         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1183         *                 to create a connection consumer due to some internal
1184         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1185         *                 and <CODE>messageSelector</CODE>.
1186         * @throws javax.jms.InvalidDestinationException if an invalid queue is
1187         *                 specified.
1188         * @throws javax.jms.InvalidSelectorException if the message selector is
1189         *                 invalid.
1190         * @see javax.jms.ConnectionConsumer
1191         */
1192        @Override
1193        public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1194            return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1195        }
1196    
1197        /**
1198         * Creates a connection consumer for this connection (optional operation).
1199         * This is an expert facility not used by regular JMS clients.
1200         *
1201         * @param destination the destination to access
1202         * @param messageSelector only messages with properties matching the message
1203         *                selector expression are delivered. A value of null or an
1204         *                empty string indicates that there is no message selector
1205         *                for the message consumer.
1206         * @param sessionPool the server session pool to associate with this
1207         *                connection consumer
1208         * @param maxMessages the maximum number of messages that can be assigned to
1209         *                a server session at one time
1210         * @return the connection consumer
1211         * @throws JMSException if the <CODE>Connection</CODE> object fails to
1212         *                 create a connection consumer due to some internal error
1213         *                 or invalid arguments for <CODE>sessionPool</CODE> and
1214         *                 <CODE>messageSelector</CODE>.
1215         * @throws javax.jms.InvalidDestinationException if an invalid destination
1216         *                 is specified.
1217         * @throws javax.jms.InvalidSelectorException if the message selector is
1218         *                 invalid.
1219         * @see javax.jms.ConnectionConsumer
1220         * @since 1.1
1221         */
1222        @Override
1223        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1224            return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1225        }
1226    
1227        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1228            throws JMSException {
1229    
1230            checkClosedOrFailed();
1231            ensureConnectionInfoSent();
1232    
1233            ConsumerId consumerId = createConsumerId();
1234            ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1235            consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1236            consumerInfo.setSelector(messageSelector);
1237            consumerInfo.setPrefetchSize(maxMessages);
1238            consumerInfo.setNoLocal(noLocal);
1239            consumerInfo.setDispatchAsync(isDispatchAsync());
1240    
1241            // Allows the options on the destination to configure the consumerInfo
1242            if (consumerInfo.getDestination().getOptions() != null) {
1243                Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1244                IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1245            }
1246    
1247            return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1248        }
1249    
1250        /**
1251         * @return
1252         */
1253        private ConsumerId createConsumerId() {
1254            return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1255        }
1256    
1257        /**
1258         * @return
1259         */
1260        private ProducerId createProducerId() {
1261            return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1262        }
1263    
1264        /**
1265         * Creates a <CODE>QueueSession</CODE> object.
1266         *
1267         * @param transacted indicates whether the session is transacted
1268         * @param acknowledgeMode indicates whether the consumer or the client will
1269         *                acknowledge any messages it receives; ignored if the
1270         *                session is transacted. Legal values are
1271         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1272         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1273         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1274         * @return a newly created queue session
1275         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1276         *                 to create a session due to some internal error or lack of
1277         *                 support for the specific transaction and acknowledgement
1278         *                 mode.
1279         * @see Session#AUTO_ACKNOWLEDGE
1280         * @see Session#CLIENT_ACKNOWLEDGE
1281         * @see Session#DUPS_OK_ACKNOWLEDGE
1282         */
1283        @Override
1284        public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1285            return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1286        }
1287    
1288        /**
1289         * Ensures that the clientID was manually specified and not auto-generated.
1290         * If the clientID was not specified this method will throw an exception.
1291         * This method is used to ensure that the clientID + durableSubscriber name
1292         * are used correctly.
1293         *
1294         * @throws JMSException
1295         */
1296        public void checkClientIDWasManuallySpecified() throws JMSException {
1297            if (!userSpecifiedClientID) {
1298                throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1299            }
1300        }
1301    
1302        /**
1303         * send a Packet through the Connection - for internal use only
1304         *
1305         * @param command
1306         * @throws JMSException
1307         */
1308        public void asyncSendPacket(Command command) throws JMSException {
1309            if (isClosed()) {
1310                throw new ConnectionClosedException();
1311            } else {
1312                doAsyncSendPacket(command);
1313            }
1314        }
1315    
1316        private void doAsyncSendPacket(Command command) throws JMSException {
1317            try {
1318                this.transport.oneway(command);
1319            } catch (IOException e) {
1320                throw JMSExceptionSupport.create(e);
1321            }
1322        }
1323    
1324        /**
1325         * Send a packet through a Connection - for internal use only
1326         *
1327         * @param command
1328         * @return
1329         * @throws JMSException
1330         */
1331        public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1332            if(onComplete==null) {
1333                syncSendPacket(command);
1334            } else {
1335                if (isClosed()) {
1336                    throw new ConnectionClosedException();
1337                }
1338                try {
1339                    this.transport.asyncRequest(command, new ResponseCallback() {
1340                        @Override
1341                        public void onCompletion(FutureResponse resp) {
1342                            Response response;
1343                            Throwable exception = null;
1344                            try {
1345                                response = resp.getResult();
1346                                if (response.isException()) {
1347                                    ExceptionResponse er = (ExceptionResponse)response;
1348                                    exception = er.getException();
1349                                }
1350                            } catch (Exception e) {
1351                                exception = e;
1352                            }
1353                            if(exception!=null) {
1354                                if ( exception instanceof JMSException) {
1355                                    onComplete.onException((JMSException) exception);
1356                                } else {
1357                                    if (isClosed()||closing.get()) {
1358                                        LOG.debug("Received an exception but connection is closing");
1359                                    }
1360                                    JMSException jmsEx = null;
1361                                    try {
1362                                        jmsEx = JMSExceptionSupport.create(exception);
1363                                    } catch(Throwable e) {
1364                                        LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1365                                    }
1366                                    // dispose of transport for security exceptions on connection initiation
1367                                    if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1368                                        Transport t = transport;
1369                                        if (null != t){
1370                                            ServiceSupport.dispose(t);
1371                                        }
1372                                    }
1373                                    if (jmsEx !=null) {
1374                                        onComplete.onException(jmsEx);
1375                                    }
1376                                }
1377                            } else {
1378                                onComplete.onSuccess();
1379                            }
1380                        }
1381                    });
1382                } catch (IOException e) {
1383                    throw JMSExceptionSupport.create(e);
1384                }
1385            }
1386        }
1387    
1388        public Response syncSendPacket(Command command) throws JMSException {
1389            if (isClosed()) {
1390                throw new ConnectionClosedException();
1391            } else {
1392    
1393                try {
1394                    Response response = (Response)this.transport.request(command);
1395                    if (response.isException()) {
1396                        ExceptionResponse er = (ExceptionResponse)response;
1397                        if (er.getException() instanceof JMSException) {
1398                            throw (JMSException)er.getException();
1399                        } else {
1400                            if (isClosed()||closing.get()) {
1401                                LOG.debug("Received an exception but connection is closing");
1402                            }
1403                            JMSException jmsEx = null;
1404                            try {
1405                                jmsEx = JMSExceptionSupport.create(er.getException());
1406                            } catch(Throwable e) {
1407                                LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1408                            }
1409                            //dispose of transport for security exceptions
1410                            if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1411                                Transport t = this.transport;
1412                                if (null != t){
1413                                    ServiceSupport.dispose(t);
1414                                }
1415                            }
1416                            if (jmsEx !=null) {
1417                                throw jmsEx;
1418                            }
1419                        }
1420                    }
1421                    return response;
1422                } catch (IOException e) {
1423                    throw JMSExceptionSupport.create(e);
1424                }
1425            }
1426        }
1427    
1428        /**
1429         * Send a packet through a Connection - for internal use only
1430         *
1431         * @param command
1432         * @return
1433         * @throws JMSException
1434         */
1435        public Response syncSendPacket(Command command, int timeout) throws JMSException {
1436            if (isClosed() || closing.get()) {
1437                throw new ConnectionClosedException();
1438            } else {
1439                return doSyncSendPacket(command, timeout);
1440            }
1441        }
1442    
1443        private Response doSyncSendPacket(Command command, int timeout)
1444                throws JMSException {
1445            try {
1446                Response response = (Response) (timeout > 0
1447                        ? this.transport.request(command, timeout)
1448                        : this.transport.request(command));
1449                if (response != null && response.isException()) {
1450                    ExceptionResponse er = (ExceptionResponse)response;
1451                    if (er.getException() instanceof JMSException) {
1452                        throw (JMSException)er.getException();
1453                    } else {
1454                        throw JMSExceptionSupport.create(er.getException());
1455                    }
1456                }
1457                return response;
1458            } catch (IOException e) {
1459                throw JMSExceptionSupport.create(e);
1460            }
1461        }
1462    
1463        /**
1464         * @return statistics for this Connection
1465         */
1466        @Override
1467        public StatsImpl getStats() {
1468            return stats;
1469        }
1470    
1471        /**
1472         * simply throws an exception if the Connection is already closed or the
1473         * Transport has failed
1474         *
1475         * @throws JMSException
1476         */
1477        protected synchronized void checkClosedOrFailed() throws JMSException {
1478            checkClosed();
1479            if (transportFailed.get()) {
1480                throw new ConnectionFailedException(firstFailureError);
1481            }
1482        }
1483    
1484        /**
1485         * simply throws an exception if the Connection is already closed
1486         *
1487         * @throws JMSException
1488         */
1489        protected synchronized void checkClosed() throws JMSException {
1490            if (closed.get()) {
1491                throw new ConnectionClosedException();
1492            }
1493        }
1494    
1495        /**
1496         * Send the ConnectionInfo to the Broker
1497         *
1498         * @throws JMSException
1499         */
1500        protected void ensureConnectionInfoSent() throws JMSException {
1501            synchronized(this.ensureConnectionInfoSentMutex) {
1502                // Can we skip sending the ConnectionInfo packet??
1503                if (isConnectionInfoSentToBroker || closed.get()) {
1504                    return;
1505                }
1506                //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1507                if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1508                    info.setClientId(clientIdGenerator.generateId());
1509                }
1510                syncSendPacket(info.copy());
1511    
1512                this.isConnectionInfoSentToBroker = true;
1513                // Add a temp destination advisory consumer so that
1514                // We know what the valid temporary destinations are on the
1515                // broker without having to do an RPC to the broker.
1516    
1517                ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1518                if (watchTopicAdvisories) {
1519                    advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1520                }
1521            }
1522        }
1523    
1524        public synchronized boolean isWatchTopicAdvisories() {
1525            return watchTopicAdvisories;
1526        }
1527    
1528        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1529            this.watchTopicAdvisories = watchTopicAdvisories;
1530        }
1531    
1532        /**
1533         * @return Returns the useAsyncSend.
1534         */
1535        public boolean isUseAsyncSend() {
1536            return useAsyncSend;
1537        }
1538    
1539        /**
1540         * Forces the use of <a
1541         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1542         * adds a massive performance boost; but means that the send() method will
1543         * return immediately whether the message has been sent or not which could
1544         * lead to message loss.
1545         */
1546        public void setUseAsyncSend(boolean useAsyncSend) {
1547            this.useAsyncSend = useAsyncSend;
1548        }
1549    
1550        /**
1551         * @return true if always sync send messages
1552         */
1553        public boolean isAlwaysSyncSend() {
1554            return this.alwaysSyncSend;
1555        }
1556    
1557        /**
1558         * Set true if always require messages to be sync sent
1559         *
1560         * @param alwaysSyncSend
1561         */
1562        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1563            this.alwaysSyncSend = alwaysSyncSend;
1564        }
1565    
1566        /**
1567         * @return the messagePrioritySupported
1568         */
1569        public boolean isMessagePrioritySupported() {
1570            return this.messagePrioritySupported;
1571        }
1572    
1573        /**
1574         * @param messagePrioritySupported the messagePrioritySupported to set
1575         */
1576        public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1577            this.messagePrioritySupported = messagePrioritySupported;
1578        }
1579    
1580        /**
1581         * Cleans up this connection so that it's state is as if the connection was
1582         * just created. This allows the Resource Adapter to clean up a connection
1583         * so that it can be reused without having to close and recreate the
1584         * connection.
1585         */
1586        public void cleanup() throws JMSException {
1587    
1588            if (advisoryConsumer != null && !isTransportFailed()) {
1589                advisoryConsumer.dispose();
1590                advisoryConsumer = null;
1591            }
1592    
1593            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1594                ActiveMQSession s = i.next();
1595                s.dispose();
1596            }
1597            for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1598                ActiveMQConnectionConsumer c = i.next();
1599                c.dispose();
1600            }
1601            for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1602                ActiveMQInputStream c = i.next();
1603                c.dispose();
1604            }
1605            for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1606                ActiveMQOutputStream c = i.next();
1607                c.dispose();
1608            }
1609    
1610            if (isConnectionInfoSentToBroker) {
1611                if (!transportFailed.get() && !closing.get()) {
1612                    syncSendPacket(info.createRemoveCommand());
1613                }
1614                isConnectionInfoSentToBroker = false;
1615            }
1616            if (userSpecifiedClientID) {
1617                info.setClientId(null);
1618                userSpecifiedClientID = false;
1619            }
1620            clientIDSet = false;
1621    
1622            started.set(false);
1623        }
1624    
1625        @Override
1626        public void finalize() throws Throwable{
1627            Scheduler s = this.scheduler;
1628            if (s != null){
1629                s.stop();
1630            }
1631        }
1632    
1633        /**
1634         * Changes the associated username/password that is associated with this
1635         * connection. If the connection has been used, you must called cleanup()
1636         * before calling this method.
1637         *
1638         * @throws IllegalStateException if the connection is in used.
1639         */
1640        public void changeUserInfo(String userName, String password) throws JMSException {
1641            if (isConnectionInfoSentToBroker) {
1642                throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1643            }
1644            this.info.setUserName(userName);
1645            this.info.setPassword(password);
1646        }
1647    
1648        /**
1649         * @return Returns the resourceManagerId.
1650         * @throws JMSException
1651         */
1652        public String getResourceManagerId() throws JMSException {
1653            waitForBrokerInfo();
1654            if (brokerInfo == null) {
1655                throw new JMSException("Connection failed before Broker info was received.");
1656            }
1657            return brokerInfo.getBrokerId().getValue();
1658        }
1659    
1660        /**
1661         * Returns the broker name if one is available or null if one is not
1662         * available yet.
1663         */
1664        public String getBrokerName() {
1665            try {
1666                brokerInfoReceived.await(5, TimeUnit.SECONDS);
1667                if (brokerInfo == null) {
1668                    return null;
1669                }
1670                return brokerInfo.getBrokerName();
1671            } catch (InterruptedException e) {
1672                Thread.currentThread().interrupt();
1673                return null;
1674            }
1675        }
1676    
1677        /**
1678         * Returns the broker information if it is available or null if it is not
1679         * available yet.
1680         */
1681        public BrokerInfo getBrokerInfo() {
1682            return brokerInfo;
1683        }
1684    
1685        /**
1686         * @return Returns the RedeliveryPolicy.
1687         * @throws JMSException
1688         */
1689        public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1690            return redeliveryPolicyMap.getDefaultEntry();
1691        }
1692    
1693        /**
1694         * Sets the redelivery policy to be used when messages are rolled back
1695         */
1696        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1697            this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1698        }
1699    
1700        public BlobTransferPolicy getBlobTransferPolicy() {
1701            if (blobTransferPolicy == null) {
1702                blobTransferPolicy = createBlobTransferPolicy();
1703            }
1704            return blobTransferPolicy;
1705        }
1706    
1707        /**
1708         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1709         * OBjects) are transferred from producers to brokers to consumers
1710         */
1711        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1712            this.blobTransferPolicy = blobTransferPolicy;
1713        }
1714    
1715        /**
1716         * @return Returns the alwaysSessionAsync.
1717         */
1718        public boolean isAlwaysSessionAsync() {
1719            return alwaysSessionAsync;
1720        }
1721    
1722        /**
1723         * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1724         * the Connection. However, a separate thread is always used if there is more than one session, or the session
1725         * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
1726         * happens asynchronously.
1727         */
1728        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1729            this.alwaysSessionAsync = alwaysSessionAsync;
1730        }
1731    
1732        /**
1733         * @return Returns the optimizeAcknowledge.
1734         */
1735        public boolean isOptimizeAcknowledge() {
1736            return optimizeAcknowledge;
1737        }
1738    
1739        /**
1740         * Enables an optimised acknowledgement mode where messages are acknowledged
1741         * in batches rather than individually
1742         *
1743         * @param optimizeAcknowledge The optimizeAcknowledge to set.
1744         */
1745        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1746            this.optimizeAcknowledge = optimizeAcknowledge;
1747        }
1748    
1749        /**
1750         * The max time in milliseconds between optimized ack batches
1751         * @param optimizeAcknowledgeTimeOut
1752         */
1753        public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1754            this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1755        }
1756    
1757        public long getOptimizeAcknowledgeTimeOut() {
1758            return optimizeAcknowledgeTimeOut;
1759        }
1760    
1761        public long getWarnAboutUnstartedConnectionTimeout() {
1762            return warnAboutUnstartedConnectionTimeout;
1763        }
1764    
1765        /**
1766         * Enables the timeout from a connection creation to when a warning is
1767         * generated if the connection is not properly started via {@link #start()}
1768         * and a message is received by a consumer. It is a very common gotcha to
1769         * forget to <a
1770         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1771         * the connection</a> so this option makes the default case to create a
1772         * warning if the user forgets. To disable the warning just set the value to <
1773         * 0 (say -1).
1774         */
1775        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1776            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1777        }
1778    
1779        /**
1780         * @return the sendTimeout
1781         */
1782        public int getSendTimeout() {
1783            return sendTimeout;
1784        }
1785    
1786        /**
1787         * @param sendTimeout the sendTimeout to set
1788         */
1789        public void setSendTimeout(int sendTimeout) {
1790            this.sendTimeout = sendTimeout;
1791        }
1792    
1793        /**
1794         * @return the sendAcksAsync
1795         */
1796        public boolean isSendAcksAsync() {
1797            return sendAcksAsync;
1798        }
1799    
1800        /**
1801         * @param sendAcksAsync the sendAcksAsync to set
1802         */
1803        public void setSendAcksAsync(boolean sendAcksAsync) {
1804            this.sendAcksAsync = sendAcksAsync;
1805        }
1806    
1807        /**
1808         * Returns the time this connection was created
1809         */
1810        public long getTimeCreated() {
1811            return timeCreated;
1812        }
1813    
1814        private void waitForBrokerInfo() throws JMSException {
1815            try {
1816                brokerInfoReceived.await();
1817            } catch (InterruptedException e) {
1818                Thread.currentThread().interrupt();
1819                throw JMSExceptionSupport.create(e);
1820            }
1821        }
1822    
1823        // Package protected so that it can be used in unit tests
1824        public Transport getTransport() {
1825            return transport;
1826        }
1827    
1828        public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1829            producers.put(producerId, producer);
1830        }
1831    
1832        public void removeProducer(ProducerId producerId) {
1833            producers.remove(producerId);
1834        }
1835    
1836        public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1837            dispatchers.put(consumerId, dispatcher);
1838        }
1839    
1840        public void removeDispatcher(ConsumerId consumerId) {
1841            dispatchers.remove(consumerId);
1842        }
1843    
1844        /**
1845         * @param o - the command to consume
1846         */
1847        @Override
1848        public void onCommand(final Object o) {
1849            final Command command = (Command)o;
1850            if (!closed.get() && command != null) {
1851                try {
1852                    command.visit(new CommandVisitorAdapter() {
1853                        @Override
1854                        public Response processMessageDispatch(MessageDispatch md) throws Exception {
1855                            waitForTransportInterruptionProcessingToComplete();
1856                            ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1857                            if (dispatcher != null) {
1858                                // Copy in case a embedded broker is dispatching via
1859                                // vm://
1860                                // md.getMessage() == null to signal end of queue
1861                                // browse.
1862                                Message msg = md.getMessage();
1863                                if (msg != null) {
1864                                    msg = msg.copy();
1865                                    msg.setReadOnlyBody(true);
1866                                    msg.setReadOnlyProperties(true);
1867                                    msg.setRedeliveryCounter(md.getRedeliveryCounter());
1868                                    msg.setConnection(ActiveMQConnection.this);
1869                                    msg.setMemoryUsage(null);
1870                                    md.setMessage(msg);
1871                                }
1872                                dispatcher.dispatch(md);
1873                            }
1874                            return null;
1875                        }
1876    
1877                        @Override
1878                        public Response processProducerAck(ProducerAck pa) throws Exception {
1879                            if (pa != null && pa.getProducerId() != null) {
1880                                ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1881                                if (producer != null) {
1882                                    producer.onProducerAck(pa);
1883                                }
1884                            }
1885                            return null;
1886                        }
1887    
1888                        @Override
1889                        public Response processBrokerInfo(BrokerInfo info) throws Exception {
1890                            brokerInfo = info;
1891                            brokerInfoReceived.countDown();
1892                            optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1893                            getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1894                            return null;
1895                        }
1896    
1897                        @Override
1898                        public Response processConnectionError(final ConnectionError error) throws Exception {
1899                            executor.execute(new Runnable() {
1900                                @Override
1901                                public void run() {
1902                                    onAsyncException(error.getException());
1903                                }
1904                            });
1905                            return null;
1906                        }
1907    
1908                        @Override
1909                        public Response processControlCommand(ControlCommand command) throws Exception {
1910                            onControlCommand(command);
1911                            return null;
1912                        }
1913    
1914                        @Override
1915                        public Response processConnectionControl(ConnectionControl control) throws Exception {
1916                            onConnectionControl((ConnectionControl)command);
1917                            return null;
1918                        }
1919    
1920                        @Override
1921                        public Response processConsumerControl(ConsumerControl control) throws Exception {
1922                            onConsumerControl((ConsumerControl)command);
1923                            return null;
1924                        }
1925    
1926                        @Override
1927                        public Response processWireFormat(WireFormatInfo info) throws Exception {
1928                            onWireFormatInfo((WireFormatInfo)command);
1929                            return null;
1930                        }
1931                    });
1932                } catch (Exception e) {
1933                    onClientInternalException(e);
1934                }
1935            }
1936    
1937            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1938                TransportListener listener = iter.next();
1939                listener.onCommand(command);
1940            }
1941        }
1942    
1943        protected void onWireFormatInfo(WireFormatInfo info) {
1944            protocolVersion.set(info.getVersion());
1945        }
1946    
1947        /**
1948         * Handles async client internal exceptions.
1949         * A client internal exception is usually one that has been thrown
1950         * by a container runtime component during asynchronous processing of a
1951         * message that does not affect the connection itself.
1952         * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1953         * its <code>onException</code> method, if one has been registered with this connection.
1954         *
1955         * @param error the exception that the problem
1956         */
1957        public void onClientInternalException(final Throwable error) {
1958            if ( !closed.get() && !closing.get() ) {
1959                if ( this.clientInternalExceptionListener != null ) {
1960                    executor.execute(new Runnable() {
1961                        @Override
1962                        public void run() {
1963                            ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1964                        }
1965                    });
1966                } else {
1967                    LOG.debug("Async client internal exception occurred with no exception listener registered: "
1968                            + error, error);
1969                }
1970            }
1971        }
1972    
1973        /**
1974         * Used for handling async exceptions
1975         *
1976         * @param error
1977         */
1978        public void onAsyncException(Throwable error) {
1979            if (!closed.get() && !closing.get()) {
1980                if (this.exceptionListener != null) {
1981    
1982                    if (!(error instanceof JMSException)) {
1983                        error = JMSExceptionSupport.create(error);
1984                    }
1985                    final JMSException e = (JMSException)error;
1986    
1987                    executor.execute(new Runnable() {
1988                        @Override
1989                        public void run() {
1990                            ActiveMQConnection.this.exceptionListener.onException(e);
1991                        }
1992                    });
1993    
1994                } else {
1995                    LOG.debug("Async exception with no exception listener: " + error, error);
1996                }
1997            }
1998        }
1999    
2000        @Override
2001        public void onException(final IOException error) {
2002            onAsyncException(error);
2003            if (!closing.get() && !closed.get()) {
2004                executor.execute(new Runnable() {
2005                    @Override
2006                    public void run() {
2007                        transportFailed(error);
2008                        ServiceSupport.dispose(ActiveMQConnection.this.transport);
2009                        brokerInfoReceived.countDown();
2010                        try {
2011                            cleanup();
2012                        } catch (JMSException e) {
2013                            LOG.warn("Exception during connection cleanup, " + e, e);
2014                        }
2015                        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2016                            TransportListener listener = iter.next();
2017                            listener.onException(error);
2018                        }
2019                    }
2020                });
2021            }
2022        }
2023    
2024        @Override
2025        public void transportInterupted() {
2026            transportInterruptionProcessingComplete.set(1);
2027            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2028                ActiveMQSession s = i.next();
2029                s.clearMessagesInProgress(transportInterruptionProcessingComplete);
2030            }
2031    
2032            for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
2033                connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
2034            }
2035    
2036            if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
2037                if (LOG.isDebugEnabled()) {
2038                    LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get());
2039                }
2040                signalInterruptionProcessingNeeded();
2041            }
2042    
2043            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2044                TransportListener listener = iter.next();
2045                listener.transportInterupted();
2046            }
2047        }
2048    
2049        @Override
2050        public void transportResumed() {
2051            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2052                TransportListener listener = iter.next();
2053                listener.transportResumed();
2054            }
2055        }
2056    
2057        /**
2058         * Create the DestinationInfo object for the temporary destination.
2059         *
2060         * @param topic - if its true topic, else queue.
2061         * @return DestinationInfo
2062         * @throws JMSException
2063         */
2064        protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2065    
2066            // Check if Destination info is of temporary type.
2067            ActiveMQTempDestination dest;
2068            if (topic) {
2069                dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2070            } else {
2071                dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2072            }
2073    
2074            DestinationInfo info = new DestinationInfo();
2075            info.setConnectionId(this.info.getConnectionId());
2076            info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2077            info.setDestination(dest);
2078            syncSendPacket(info);
2079    
2080            dest.setConnection(this);
2081            activeTempDestinations.put(dest, dest);
2082            return dest;
2083        }
2084    
2085        /**
2086         * @param destination
2087         * @throws JMSException
2088         */
2089        public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2090    
2091            checkClosedOrFailed();
2092    
2093            for (ActiveMQSession session : this.sessions) {
2094                if (session.isInUse(destination)) {
2095                    throw new JMSException("A consumer is consuming from the temporary destination");
2096                }
2097            }
2098    
2099            activeTempDestinations.remove(destination);
2100    
2101            DestinationInfo destInfo = new DestinationInfo();
2102            destInfo.setConnectionId(this.info.getConnectionId());
2103            destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2104            destInfo.setDestination(destination);
2105            destInfo.setTimeout(0);
2106            syncSendPacket(destInfo);
2107        }
2108    
2109        public boolean isDeleted(ActiveMQDestination dest) {
2110    
2111            // If we are not watching the advisories.. then
2112            // we will assume that the temp destination does exist.
2113            if (advisoryConsumer == null) {
2114                return false;
2115            }
2116    
2117            return !activeTempDestinations.contains(dest);
2118        }
2119    
2120        public boolean isCopyMessageOnSend() {
2121            return copyMessageOnSend;
2122        }
2123    
2124        public LongSequenceGenerator getLocalTransactionIdGenerator() {
2125            return localTransactionIdGenerator;
2126        }
2127    
2128        public boolean isUseCompression() {
2129            return useCompression;
2130        }
2131    
2132        /**
2133         * Enables the use of compression of the message bodies
2134         */
2135        public void setUseCompression(boolean useCompression) {
2136            this.useCompression = useCompression;
2137        }
2138    
2139        public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2140    
2141            checkClosedOrFailed();
2142            ensureConnectionInfoSent();
2143    
2144            DestinationInfo info = new DestinationInfo();
2145            info.setConnectionId(this.info.getConnectionId());
2146            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2147            info.setDestination(destination);
2148            info.setTimeout(0);
2149            syncSendPacket(info);
2150        }
2151    
2152        public boolean isDispatchAsync() {
2153            return dispatchAsync;
2154        }
2155    
2156        /**
2157         * Enables or disables the default setting of whether or not consumers have
2158         * their messages <a
2159         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2160         * synchronously or asynchronously by the broker</a>. For non-durable
2161         * topics for example we typically dispatch synchronously by default to
2162         * minimize context switches which boost performance. However sometimes its
2163         * better to go slower to ensure that a single blocked consumer socket does
2164         * not block delivery to other consumers.
2165         *
2166         * @param asyncDispatch If true then consumers created on this connection
2167         *                will default to having their messages dispatched
2168         *                asynchronously. The default value is true.
2169         */
2170        public void setDispatchAsync(boolean asyncDispatch) {
2171            this.dispatchAsync = asyncDispatch;
2172        }
2173    
2174        public boolean isObjectMessageSerializationDefered() {
2175            return objectMessageSerializationDefered;
2176        }
2177    
2178        /**
2179         * When an object is set on an ObjectMessage, the JMS spec requires the
2180         * object to be serialized by that set method. Enabling this flag causes the
2181         * object to not get serialized. The object may subsequently get serialized
2182         * if the message needs to be sent over a socket or stored to disk.
2183         */
2184        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2185            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2186        }
2187    
2188        @Override
2189        public InputStream createInputStream(Destination dest) throws JMSException {
2190            return createInputStream(dest, null);
2191        }
2192    
2193        @Override
2194        public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2195            return createInputStream(dest, messageSelector, false);
2196        }
2197    
2198        @Override
2199        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2200            return createInputStream(dest, messageSelector, noLocal,  -1);
2201        }
2202    
2203        @Override
2204        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2205            return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2206        }
2207    
2208        @Override
2209        public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2210            return createInputStream(dest, null, false);
2211        }
2212    
2213        @Override
2214        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2215            return createDurableInputStream(dest, name, messageSelector, false);
2216        }
2217    
2218        @Override
2219        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2220            return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2221        }
2222    
2223        @Override
2224        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2225            return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2226        }
2227    
2228        private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2229            checkClosedOrFailed();
2230            ensureConnectionInfoSent();
2231            return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2232        }
2233    
2234        /**
2235         * Creates a persistent output stream; individual messages will be written
2236         * to disk/database by the broker
2237         */
2238        @Override
2239        public OutputStream createOutputStream(Destination dest) throws JMSException {
2240            return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2241        }
2242    
2243        /**
2244         * Creates a non persistent output stream; messages will not be written to
2245         * disk
2246         */
2247        public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2248            return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2249        }
2250    
2251        /**
2252         * Creates an output stream allowing full control over the delivery mode,
2253         * the priority and time to live of the messages and the properties added to
2254         * messages on the stream.
2255         *
2256         * @param streamProperties defines a map of key-value pairs where the keys
2257         *                are strings and the values are primitive values (numbers
2258         *                and strings) which are appended to the messages similarly
2259         *                to using the
2260         *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2261         *                method
2262         */
2263        @Override
2264        public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2265            checkClosedOrFailed();
2266            ensureConnectionInfoSent();
2267            return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2268        }
2269    
2270        /**
2271         * Unsubscribes a durable subscription that has been created by a client.
2272         * <P>
2273         * This method deletes the state being maintained on behalf of the
2274         * subscriber by its provider.
2275         * <P>
2276         * It is erroneous for a client to delete a durable subscription while there
2277         * is an active <CODE>MessageConsumer </CODE> or
2278         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2279         * message is part of a pending transaction or has not been acknowledged in
2280         * the session.
2281         *
2282         * @param name the name used to identify this subscription
2283         * @throws JMSException if the session fails to unsubscribe to the durable
2284         *                 subscription due to some internal error.
2285         * @throws InvalidDestinationException if an invalid subscription name is
2286         *                 specified.
2287         * @since 1.1
2288         */
2289        @Override
2290        public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2291            checkClosedOrFailed();
2292            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2293            rsi.setConnectionId(getConnectionInfo().getConnectionId());
2294            rsi.setSubscriptionName(name);
2295            rsi.setClientId(getConnectionInfo().getClientId());
2296            syncSendPacket(rsi);
2297        }
2298    
2299        /**
2300         * Internal send method optimized: - It does not copy the message - It can
2301         * only handle ActiveMQ messages. - You can specify if the send is async or
2302         * sync - Does not allow you to send /w a transaction.
2303         */
2304        void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2305            checkClosedOrFailed();
2306    
2307            if (destination.isTemporary() && isDeleted(destination)) {
2308                throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2309            }
2310    
2311            msg.setJMSDestination(destination);
2312            msg.setJMSDeliveryMode(deliveryMode);
2313            long expiration = 0L;
2314    
2315            if (!isDisableTimeStampsByDefault()) {
2316                long timeStamp = System.currentTimeMillis();
2317                msg.setJMSTimestamp(timeStamp);
2318                if (timeToLive > 0) {
2319                    expiration = timeToLive + timeStamp;
2320                }
2321            }
2322    
2323            msg.setJMSExpiration(expiration);
2324            msg.setJMSPriority(priority);
2325            msg.setJMSRedelivered(false);
2326            msg.setMessageId(messageId);
2327            msg.onSend();
2328            msg.setProducerId(msg.getMessageId().getProducerId());
2329    
2330            if (LOG.isDebugEnabled()) {
2331                LOG.debug("Sending message: " + msg);
2332            }
2333    
2334            if (async) {
2335                asyncSendPacket(msg);
2336            } else {
2337                syncSendPacket(msg);
2338            }
2339        }
2340    
2341        public void addOutputStream(ActiveMQOutputStream stream) {
2342            outputStreams.add(stream);
2343        }
2344    
2345        public void removeOutputStream(ActiveMQOutputStream stream) {
2346            outputStreams.remove(stream);
2347        }
2348    
2349        public void addInputStream(ActiveMQInputStream stream) {
2350            inputStreams.add(stream);
2351        }
2352    
2353        public void removeInputStream(ActiveMQInputStream stream) {
2354            inputStreams.remove(stream);
2355        }
2356    
2357        protected void onControlCommand(ControlCommand command) {
2358            String text = command.getCommand();
2359            if (text != null) {
2360                if ("shutdown".equals(text)) {
2361                    LOG.info("JVM told to shutdown");
2362                    System.exit(0);
2363                }
2364    
2365                // TODO Should we handle the "close" case?
2366                // if (false && "close".equals(text)){
2367                //     LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2368                //     try {
2369                //         close();
2370                //     } catch (JMSException e) {
2371                //     }
2372                // }
2373            }
2374        }
2375    
2376        protected void onConnectionControl(ConnectionControl command) {
2377            if (command.isFaultTolerant()) {
2378                this.optimizeAcknowledge = false;
2379                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2380                    ActiveMQSession s = i.next();
2381                    s.setOptimizeAcknowledge(false);
2382                }
2383            }
2384        }
2385    
2386        protected void onConsumerControl(ConsumerControl command) {
2387            if (command.isClose()) {
2388                for (ActiveMQSession session : this.sessions) {
2389                    session.close(command.getConsumerId());
2390                }
2391            } else {
2392                for (ActiveMQSession session : this.sessions) {
2393                    session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2394                }
2395                for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
2396                    ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
2397                    if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
2398                        consumerInfo.setPrefetchSize(command.getPrefetch());
2399                    }
2400                }
2401            }
2402        }
2403    
2404        protected void transportFailed(IOException error) {
2405            transportFailed.set(true);
2406            if (firstFailureError == null) {
2407                firstFailureError = error;
2408            }
2409        }
2410    
2411        /**
2412         * Should a JMS message be copied to a new JMS Message object as part of the
2413         * send() method in JMS. This is enabled by default to be compliant with the
2414         * JMS specification. You can disable it if you do not mutate JMS messages
2415         * after they are sent for a performance boost
2416         */
2417        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2418            this.copyMessageOnSend = copyMessageOnSend;
2419        }
2420    
2421        @Override
2422        public String toString() {
2423            return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2424        }
2425    
2426        protected BlobTransferPolicy createBlobTransferPolicy() {
2427            return new BlobTransferPolicy();
2428        }
2429    
2430        public int getProtocolVersion() {
2431            return protocolVersion.get();
2432        }
2433    
2434        public int getProducerWindowSize() {
2435            return producerWindowSize;
2436        }
2437    
2438        public void setProducerWindowSize(int producerWindowSize) {
2439            this.producerWindowSize = producerWindowSize;
2440        }
2441    
2442        public void setAuditDepth(int auditDepth) {
2443            connectionAudit.setAuditDepth(auditDepth);
2444        }
2445    
2446        public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2447            connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2448        }
2449    
2450        protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2451            connectionAudit.removeDispatcher(dispatcher);
2452        }
2453    
2454        protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2455            return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2456        }
2457    
2458        protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2459            connectionAudit.rollbackDuplicate(dispatcher, message);
2460        }
2461    
2462        public IOException getFirstFailureError() {
2463            return firstFailureError;
2464        }
2465    
2466        protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2467            if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) {
2468                LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get());
2469                signalInterruptionProcessingComplete();
2470            }
2471        }
2472    
2473        protected void transportInterruptionProcessingComplete() {
2474            if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
2475                signalInterruptionProcessingComplete();
2476            }
2477        }
2478    
2479        private void signalInterruptionProcessingComplete() {
2480                if (LOG.isDebugEnabled()) {
2481                    LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
2482                            + " for:" + this.getConnectionInfo().getConnectionId());
2483                }
2484    
2485                FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2486                if (failoverTransport != null) {
2487                    failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2488                    if (LOG.isDebugEnabled()) {
2489                        LOG.debug("notified failover transport (" + failoverTransport
2490                                + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2491                    }
2492                }
2493                transportInterruptionProcessingComplete.set(0);
2494        }
2495    
2496        private void signalInterruptionProcessingNeeded() {
2497            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2498            if (failoverTransport != null) {
2499                failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2500                if (LOG.isDebugEnabled()) {
2501                    LOG.debug("notified failover transport (" + failoverTransport
2502                            + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2503                }
2504            }
2505        }
2506    
2507        /*
2508         * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2509         * will wait to receive re dispatched messages.
2510         * default value is 0 so there is no wait by default.
2511         */
2512        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2513            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2514        }
2515    
2516        public long getConsumerFailoverRedeliveryWaitPeriod() {
2517            return consumerFailoverRedeliveryWaitPeriod;
2518        }
2519    
2520        protected Scheduler getScheduler() throws JMSException {
2521            Scheduler result = scheduler;
2522            if (result == null) {
2523                synchronized (this) {
2524                    result = scheduler;
2525                    if (result == null) {
2526                        checkClosed();
2527                        try {
2528                            result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2529                            scheduler.start();
2530                        } catch(Exception e) {
2531                            throw JMSExceptionSupport.create(e);
2532                        }
2533                    }
2534                }
2535            }
2536            return result;
2537        }
2538    
2539        protected ThreadPoolExecutor getExecutor() {
2540            return this.executor;
2541        }
2542    
2543        /**
2544         * @return the checkForDuplicates
2545         */
2546        public boolean isCheckForDuplicates() {
2547            return this.checkForDuplicates;
2548        }
2549    
2550        /**
2551         * @param checkForDuplicates the checkForDuplicates to set
2552         */
2553        public void setCheckForDuplicates(boolean checkForDuplicates) {
2554            this.checkForDuplicates = checkForDuplicates;
2555        }
2556    
2557        public boolean isTransactedIndividualAck() {
2558            return transactedIndividualAck;
2559        }
2560    
2561        public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2562            this.transactedIndividualAck = transactedIndividualAck;
2563        }
2564    
2565        public boolean isNonBlockingRedelivery() {
2566            return nonBlockingRedelivery;
2567        }
2568    
2569        public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2570            this.nonBlockingRedelivery = nonBlockingRedelivery;
2571        }
2572    
2573        /**
2574         * Removes any TempDestinations that this connection has cached, ignoring
2575         * any exceptions generated because the destination is in use as they should
2576         * not be removed.
2577         * Used from a pooled connection, b/c it will not be explicitly closed.
2578         */
2579        public void cleanUpTempDestinations() {
2580    
2581            if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2582                return;
2583            }
2584    
2585            Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2586                = this.activeTempDestinations.entrySet().iterator();
2587            while(entries.hasNext()) {
2588                ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2589                try {
2590                    // Only delete this temp destination if it was created from this connection. The connection used
2591                    // for the advisory consumer may also have a reference to this temp destination.
2592                    ActiveMQTempDestination dest = entry.getValue();
2593                    String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2594                    if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2595                        this.deleteTempDestination(entry.getValue());
2596                    }
2597                } catch (Exception ex) {
2598                    // the temp dest is in use so it can not be deleted.
2599                    // it is ok to leave it to connection tear down phase
2600                }
2601            }
2602        }
2603    
2604        /**
2605         * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2606         * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2607         */
2608        public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2609            this.redeliveryPolicyMap = redeliveryPolicyMap;
2610        }
2611    
2612        /**
2613         * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2614         * Consumers when dealing with transaction messages that have been rolled back.
2615         *
2616         * @return the redeliveryPolicyMap
2617         */
2618        public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2619            return redeliveryPolicyMap;
2620        }
2621    
2622        public int getMaxThreadPoolSize() {
2623            return maxThreadPoolSize;
2624        }
2625    
2626        public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2627            this.maxThreadPoolSize = maxThreadPoolSize;
2628        }
2629    
2630        /**
2631         * Enable enforcement of QueueConnection semantics.
2632         *
2633         * @return this object, useful for chaining
2634         */
2635        ActiveMQConnection enforceQueueOnlyConnection() {
2636            this.queueOnlyConnection = true;
2637            return this;
2638        }
2639    
2640        public RejectedExecutionHandler getRejectedTaskHandler() {
2641            return rejectedTaskHandler;
2642        }
2643    
2644        public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2645            this.rejectedTaskHandler = rejectedTaskHandler;
2646        }
2647    
2648        /**
2649         * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2650         * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
2651         * will not do any background Message acknowledgment.
2652         *
2653         * @return the scheduledOptimizedAckInterval
2654         */
2655        public long getOptimizedAckScheduledAckInterval() {
2656            return optimizedAckScheduledAckInterval;
2657        }
2658    
2659        /**
2660         * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2661         * have been configured with optimizeAcknowledge enabled.
2662         *
2663         * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
2664         */
2665        public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2666            this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2667        }
2668    }