001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.io.InputStream;
021 import java.io.OutputStream;
022 import java.net.URI;
023 import java.net.URISyntaxException;
024 import java.util.HashMap;
025 import java.util.Iterator;
026 import java.util.Map;
027 import java.util.concurrent.ConcurrentHashMap;
028 import java.util.concurrent.CopyOnWriteArrayList;
029 import java.util.concurrent.CountDownLatch;
030 import java.util.concurrent.LinkedBlockingQueue;
031 import java.util.concurrent.RejectedExecutionHandler;
032 import java.util.concurrent.ThreadFactory;
033 import java.util.concurrent.ThreadPoolExecutor;
034 import java.util.concurrent.TimeUnit;
035 import java.util.concurrent.atomic.AtomicBoolean;
036 import java.util.concurrent.atomic.AtomicInteger;
037
038 import javax.jms.Connection;
039 import javax.jms.ConnectionConsumer;
040 import javax.jms.ConnectionMetaData;
041 import javax.jms.DeliveryMode;
042 import javax.jms.Destination;
043 import javax.jms.ExceptionListener;
044 import javax.jms.IllegalStateException;
045 import javax.jms.InvalidDestinationException;
046 import javax.jms.JMSException;
047 import javax.jms.Queue;
048 import javax.jms.QueueConnection;
049 import javax.jms.QueueSession;
050 import javax.jms.ServerSessionPool;
051 import javax.jms.Session;
052 import javax.jms.Topic;
053 import javax.jms.TopicConnection;
054 import javax.jms.TopicSession;
055 import javax.jms.XAConnection;
056
057 import org.apache.activemq.advisory.DestinationSource;
058 import org.apache.activemq.blob.BlobTransferPolicy;
059 import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
060 import org.apache.activemq.command.ActiveMQDestination;
061 import org.apache.activemq.command.ActiveMQMessage;
062 import org.apache.activemq.command.ActiveMQTempDestination;
063 import org.apache.activemq.command.ActiveMQTempQueue;
064 import org.apache.activemq.command.ActiveMQTempTopic;
065 import org.apache.activemq.command.BrokerInfo;
066 import org.apache.activemq.command.Command;
067 import org.apache.activemq.command.CommandTypes;
068 import org.apache.activemq.command.ConnectionControl;
069 import org.apache.activemq.command.ConnectionError;
070 import org.apache.activemq.command.ConnectionId;
071 import org.apache.activemq.command.ConnectionInfo;
072 import org.apache.activemq.command.ConsumerControl;
073 import org.apache.activemq.command.ConsumerId;
074 import org.apache.activemq.command.ConsumerInfo;
075 import org.apache.activemq.command.ControlCommand;
076 import org.apache.activemq.command.DestinationInfo;
077 import org.apache.activemq.command.ExceptionResponse;
078 import org.apache.activemq.command.Message;
079 import org.apache.activemq.command.MessageDispatch;
080 import org.apache.activemq.command.MessageId;
081 import org.apache.activemq.command.ProducerAck;
082 import org.apache.activemq.command.ProducerId;
083 import org.apache.activemq.command.RemoveInfo;
084 import org.apache.activemq.command.RemoveSubscriptionInfo;
085 import org.apache.activemq.command.Response;
086 import org.apache.activemq.command.SessionId;
087 import org.apache.activemq.command.ShutdownInfo;
088 import org.apache.activemq.command.WireFormatInfo;
089 import org.apache.activemq.management.JMSConnectionStatsImpl;
090 import org.apache.activemq.management.JMSStatsImpl;
091 import org.apache.activemq.management.StatsCapable;
092 import org.apache.activemq.management.StatsImpl;
093 import org.apache.activemq.state.CommandVisitorAdapter;
094 import org.apache.activemq.thread.Scheduler;
095 import org.apache.activemq.thread.TaskRunnerFactory;
096 import org.apache.activemq.transport.FutureResponse;
097 import org.apache.activemq.transport.ResponseCallback;
098 import org.apache.activemq.transport.Transport;
099 import org.apache.activemq.transport.TransportListener;
100 import org.apache.activemq.transport.failover.FailoverTransport;
101 import org.apache.activemq.util.IdGenerator;
102 import org.apache.activemq.util.IntrospectionSupport;
103 import org.apache.activemq.util.JMSExceptionSupport;
104 import org.apache.activemq.util.LongSequenceGenerator;
105 import org.apache.activemq.util.ServiceSupport;
106 import org.apache.activemq.util.ThreadPoolUtils;
107 import org.slf4j.Logger;
108 import org.slf4j.LoggerFactory;
109
110 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
111
112 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
113 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
114 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
115 public static int DEFAULT_THREAD_POOL_SIZE = 1000;
116
117 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
118
119 public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
120
121 protected boolean dispatchAsync=true;
122 protected boolean alwaysSessionAsync = true;
123
124 private TaskRunnerFactory sessionTaskRunner;
125 private final ThreadPoolExecutor executor;
126
127 // Connection state variables
128 private final ConnectionInfo info;
129 private ExceptionListener exceptionListener;
130 private ClientInternalExceptionListener clientInternalExceptionListener;
131 private boolean clientIDSet;
132 private boolean isConnectionInfoSentToBroker;
133 private boolean userSpecifiedClientID;
134
135 // Configuration options variables
136 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
137 private BlobTransferPolicy blobTransferPolicy;
138 private RedeliveryPolicyMap redeliveryPolicyMap;
139 private MessageTransformer transformer;
140
141 private boolean disableTimeStampsByDefault;
142 private boolean optimizedMessageDispatch = true;
143 private boolean copyMessageOnSend = true;
144 private boolean useCompression;
145 private boolean objectMessageSerializationDefered;
146 private boolean useAsyncSend;
147 private boolean optimizeAcknowledge;
148 private long optimizeAcknowledgeTimeOut = 0;
149 private long optimizedAckScheduledAckInterval = 0;
150 private boolean nestedMapAndListEnabled = true;
151 private boolean useRetroactiveConsumer;
152 private boolean exclusiveConsumer;
153 private boolean alwaysSyncSend;
154 private int closeTimeout = 15000;
155 private boolean watchTopicAdvisories = true;
156 private long warnAboutUnstartedConnectionTimeout = 500L;
157 private int sendTimeout =0;
158 private boolean sendAcksAsync=true;
159 private boolean checkForDuplicates = true;
160 private boolean queueOnlyConnection = false;
161
162 private final Transport transport;
163 private final IdGenerator clientIdGenerator;
164 private final JMSStatsImpl factoryStats;
165 private final JMSConnectionStatsImpl stats;
166
167 private final AtomicBoolean started = new AtomicBoolean(false);
168 private final AtomicBoolean closing = new AtomicBoolean(false);
169 private final AtomicBoolean closed = new AtomicBoolean(false);
170 private final AtomicBoolean transportFailed = new AtomicBoolean(false);
171 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
172 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
173 private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
174 private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
175 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
176
177 // Maps ConsumerIds to ActiveMQConsumer objects
178 private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
179 private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
180 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
181 private final SessionId connectionSessionId;
182 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
183 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
184 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
185 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
186
187 private AdvisoryConsumer advisoryConsumer;
188 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
189 private BrokerInfo brokerInfo;
190 private IOException firstFailureError;
191 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
192
193 // Assume that protocol is the latest. Change to the actual protocol
194 // version when a WireFormatInfo is received.
195 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
196 private final long timeCreated;
197 private final ConnectionAudit connectionAudit = new ConnectionAudit();
198 private DestinationSource destinationSource;
199 private final Object ensureConnectionInfoSentMutex = new Object();
200 private boolean useDedicatedTaskRunner;
201 protected volatile CountDownLatch transportInterruptionProcessingComplete;
202 private long consumerFailoverRedeliveryWaitPeriod;
203 private Scheduler scheduler;
204 private boolean messagePrioritySupported = true;
205 private boolean transactedIndividualAck = false;
206 private boolean nonBlockingRedelivery = false;
207
208 private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
209 private RejectedExecutionHandler rejectedTaskHandler = null;
210
211 /**
212 * Construct an <code>ActiveMQConnection</code>
213 *
214 * @param transport
215 * @param factoryStats
216 * @throws Exception
217 */
218 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
219
220 this.transport = transport;
221 this.clientIdGenerator = clientIdGenerator;
222 this.factoryStats = factoryStats;
223
224 // Configure a single threaded executor who's core thread can timeout if
225 // idle
226 executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
227 @Override
228 public Thread newThread(Runnable r) {
229 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
230 //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
231 //thread.setDaemon(true);
232 return thread;
233 }
234 });
235 // asyncConnectionThread.allowCoreThreadTimeOut(true);
236 String uniqueId = connectionIdGenerator.generateId();
237 this.info = new ConnectionInfo(new ConnectionId(uniqueId));
238 this.info.setManageable(true);
239 this.info.setFaultTolerant(transport.isFaultTolerant());
240 this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
241
242 this.transport.setTransportListener(this);
243
244 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
245 this.factoryStats.addConnection(this);
246 this.timeCreated = System.currentTimeMillis();
247 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
248 }
249
250 protected void setUserName(String userName) {
251 this.info.setUserName(userName);
252 }
253
254 protected void setPassword(String password) {
255 this.info.setPassword(password);
256 }
257
258 /**
259 * A static helper method to create a new connection
260 *
261 * @return an ActiveMQConnection
262 * @throws JMSException
263 */
264 public static ActiveMQConnection makeConnection() throws JMSException {
265 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
266 return (ActiveMQConnection)factory.createConnection();
267 }
268
269 /**
270 * A static helper method to create a new connection
271 *
272 * @param uri
273 * @return and ActiveMQConnection
274 * @throws JMSException
275 */
276 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
277 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
278 return (ActiveMQConnection)factory.createConnection();
279 }
280
281 /**
282 * A static helper method to create a new connection
283 *
284 * @param user
285 * @param password
286 * @param uri
287 * @return an ActiveMQConnection
288 * @throws JMSException
289 */
290 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
291 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
292 return (ActiveMQConnection)factory.createConnection();
293 }
294
295 /**
296 * @return a number unique for this connection
297 */
298 public JMSConnectionStatsImpl getConnectionStats() {
299 return stats;
300 }
301
302 /**
303 * Creates a <CODE>Session</CODE> object.
304 *
305 * @param transacted indicates whether the session is transacted
306 * @param acknowledgeMode indicates whether the consumer or the client will
307 * acknowledge any messages it receives; ignored if the
308 * session is transacted. Legal values are
309 * <code>Session.AUTO_ACKNOWLEDGE</code>,
310 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
311 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
312 * @return a newly created session
313 * @throws JMSException if the <CODE>Connection</CODE> object fails to
314 * create a session due to some internal error or lack of
315 * support for the specific transaction and acknowledgement
316 * mode.
317 * @see Session#AUTO_ACKNOWLEDGE
318 * @see Session#CLIENT_ACKNOWLEDGE
319 * @see Session#DUPS_OK_ACKNOWLEDGE
320 * @since 1.1
321 */
322 @Override
323 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
324 checkClosedOrFailed();
325 ensureConnectionInfoSent();
326 if(!transacted) {
327 if (acknowledgeMode==Session.SESSION_TRANSACTED) {
328 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
329 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
330 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
331 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
332 }
333 }
334 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
335 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
336 }
337
338 /**
339 * @return sessionId
340 */
341 protected SessionId getNextSessionId() {
342 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
343 }
344
345 /**
346 * Gets the client identifier for this connection.
347 * <P>
348 * This value is specific to the JMS provider. It is either preconfigured by
349 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
350 * dynamically by the application by calling the <code>setClientID</code>
351 * method.
352 *
353 * @return the unique client identifier
354 * @throws JMSException if the JMS provider fails to return the client ID
355 * for this connection due to some internal error.
356 */
357 @Override
358 public String getClientID() throws JMSException {
359 checkClosedOrFailed();
360 return this.info.getClientId();
361 }
362
363 /**
364 * Sets the client identifier for this connection.
365 * <P>
366 * The preferred way to assign a JMS client's client identifier is for it to
367 * be configured in a client-specific <CODE>ConnectionFactory</CODE>
368 * object and transparently assigned to the <CODE>Connection</CODE> object
369 * it creates.
370 * <P>
371 * Alternatively, a client can set a connection's client identifier using a
372 * provider-specific value. The facility to set a connection's client
373 * identifier explicitly is not a mechanism for overriding the identifier
374 * that has been administratively configured. It is provided for the case
375 * where no administratively specified identifier exists. If one does exist,
376 * an attempt to change it by setting it must throw an
377 * <CODE>IllegalStateException</CODE>. If a client sets the client
378 * identifier explicitly, it must do so immediately after it creates the
379 * connection and before any other action on the connection is taken. After
380 * this point, setting the client identifier is a programming error that
381 * should throw an <CODE>IllegalStateException</CODE>.
382 * <P>
383 * The purpose of the client identifier is to associate a connection and its
384 * objects with a state maintained on behalf of the client by a provider.
385 * The only such state identified by the JMS API is that required to support
386 * durable subscriptions.
387 * <P>
388 * If another connection with the same <code>clientID</code> is already
389 * running when this method is called, the JMS provider should detect the
390 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
391 *
392 * @param newClientID the unique client identifier
393 * @throws JMSException if the JMS provider fails to set the client ID for
394 * this connection due to some internal error.
395 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
396 * invalid or duplicate client ID.
397 * @throws javax.jms.IllegalStateException if the JMS client attempts to set
398 * a connection's client ID at the wrong time or when it has
399 * been administratively configured.
400 */
401 @Override
402 public void setClientID(String newClientID) throws JMSException {
403 checkClosedOrFailed();
404
405 if (this.clientIDSet) {
406 throw new IllegalStateException("The clientID has already been set");
407 }
408
409 if (this.isConnectionInfoSentToBroker) {
410 throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
411 }
412
413 this.info.setClientId(newClientID);
414 this.userSpecifiedClientID = true;
415 ensureConnectionInfoSent();
416 }
417
418 /**
419 * Sets the default client id that the connection will use if explicitly not
420 * set with the setClientId() call.
421 */
422 public void setDefaultClientID(String clientID) throws JMSException {
423 this.info.setClientId(clientID);
424 this.userSpecifiedClientID = true;
425 }
426
427 /**
428 * Gets the metadata for this connection.
429 *
430 * @return the connection metadata
431 * @throws JMSException if the JMS provider fails to get the connection
432 * metadata for this connection.
433 * @see javax.jms.ConnectionMetaData
434 */
435 @Override
436 public ConnectionMetaData getMetaData() throws JMSException {
437 checkClosedOrFailed();
438 return ActiveMQConnectionMetaData.INSTANCE;
439 }
440
441 /**
442 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
443 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
444 * associated with it.
445 *
446 * @return the <CODE>ExceptionListener</CODE> for this connection, or
447 * null, if no <CODE>ExceptionListener</CODE> is associated with
448 * this connection.
449 * @throws JMSException if the JMS provider fails to get the
450 * <CODE>ExceptionListener</CODE> for this connection.
451 * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
452 */
453 @Override
454 public ExceptionListener getExceptionListener() throws JMSException {
455 checkClosedOrFailed();
456 return this.exceptionListener;
457 }
458
459 /**
460 * Sets an exception listener for this connection.
461 * <P>
462 * If a JMS provider detects a serious problem with a connection, it informs
463 * the connection's <CODE> ExceptionListener</CODE>, if one has been
464 * registered. It does this by calling the listener's <CODE>onException
465 * </CODE>
466 * method, passing it a <CODE>JMSException</CODE> object describing the
467 * problem.
468 * <P>
469 * An exception listener allows a client to be notified of a problem
470 * asynchronously. Some connections only consume messages, so they would
471 * have no other way to learn their connection has failed.
472 * <P>
473 * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
474 * <P>
475 * A JMS provider should attempt to resolve connection problems itself
476 * before it notifies the client of them.
477 *
478 * @param listener the exception listener
479 * @throws JMSException if the JMS provider fails to set the exception
480 * listener for this connection.
481 */
482 @Override
483 public void setExceptionListener(ExceptionListener listener) throws JMSException {
484 checkClosedOrFailed();
485 this.exceptionListener = listener;
486 }
487
488 /**
489 * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
490 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
491 * associated with it.
492 *
493 * @return the listener or <code>null</code> if no listener is registered with the connection.
494 */
495 public ClientInternalExceptionListener getClientInternalExceptionListener() {
496 return clientInternalExceptionListener;
497 }
498
499 /**
500 * Sets a client internal exception listener for this connection.
501 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
502 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
503 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
504 * describing the problem.
505 *
506 * @param listener the exception listener
507 */
508 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
509 this.clientInternalExceptionListener = listener;
510 }
511
512 /**
513 * Starts (or restarts) a connection's delivery of incoming messages. A call
514 * to <CODE>start</CODE> on a connection that has already been started is
515 * ignored.
516 *
517 * @throws JMSException if the JMS provider fails to start message delivery
518 * due to some internal error.
519 * @see javax.jms.Connection#stop()
520 */
521 @Override
522 public void start() throws JMSException {
523 checkClosedOrFailed();
524 ensureConnectionInfoSent();
525 if (started.compareAndSet(false, true)) {
526 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
527 ActiveMQSession session = i.next();
528 session.start();
529 }
530 }
531 }
532
533 /**
534 * Temporarily stops a connection's delivery of incoming messages. Delivery
535 * can be restarted using the connection's <CODE>start</CODE> method. When
536 * the connection is stopped, delivery to all the connection's message
537 * consumers is inhibited: synchronous receives block, and messages are not
538 * delivered to message listeners.
539 * <P>
540 * This call blocks until receives and/or message listeners in progress have
541 * completed.
542 * <P>
543 * Stopping a connection has no effect on its ability to send messages. A
544 * call to <CODE>stop</CODE> on a connection that has already been stopped
545 * is ignored.
546 * <P>
547 * A call to <CODE>stop</CODE> must not return until delivery of messages
548 * has paused. This means that a client can rely on the fact that none of
549 * its message listeners will be called and that all threads of control
550 * waiting for <CODE>receive</CODE> calls to return will not return with a
551 * message until the connection is restarted. The receive timers for a
552 * stopped connection continue to advance, so receives may time out while
553 * the connection is stopped.
554 * <P>
555 * If message listeners are running when <CODE>stop</CODE> is invoked, the
556 * <CODE>stop</CODE> call must wait until all of them have returned before
557 * it may return. While these message listeners are completing, they must
558 * have the full services of the connection available to them.
559 *
560 * @throws JMSException if the JMS provider fails to stop message delivery
561 * due to some internal error.
562 * @see javax.jms.Connection#start()
563 */
564 @Override
565 public void stop() throws JMSException {
566 checkClosedOrFailed();
567 if (started.compareAndSet(true, false)) {
568 synchronized(sessions) {
569 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
570 ActiveMQSession s = i.next();
571 s.stop();
572 }
573 }
574 }
575 }
576
577 /**
578 * Closes the connection.
579 * <P>
580 * Since a provider typically allocates significant resources outside the
581 * JVM on behalf of a connection, clients should close these resources when
582 * they are not needed. Relying on garbage collection to eventually reclaim
583 * these resources may not be timely enough.
584 * <P>
585 * There is no need to close the sessions, producers, and consumers of a
586 * closed connection.
587 * <P>
588 * Closing a connection causes all temporary destinations to be deleted.
589 * <P>
590 * When this method is invoked, it should not return until message
591 * processing has been shut down in an orderly fashion. This means that all
592 * message listeners that may have been running have returned, and that all
593 * pending receives have returned. A close terminates all pending message
594 * receives on the connection's sessions' consumers. The receives may return
595 * with a message or with null, depending on whether there was a message
596 * available at the time of the close. If one or more of the connection's
597 * sessions' message listeners is processing a message at the time when
598 * connection <CODE>close</CODE> is invoked, all the facilities of the
599 * connection and its sessions must remain available to those listeners
600 * until they return control to the JMS provider.
601 * <P>
602 * Closing a connection causes any of its sessions' transactions in progress
603 * to be rolled back. In the case where a session's work is coordinated by
604 * an external transaction manager, a session's <CODE>commit</CODE> and
605 * <CODE> rollback</CODE> methods are not used and the result of a closed
606 * session's work is determined later by the transaction manager. Closing a
607 * connection does NOT force an acknowledgment of client-acknowledged
608 * sessions.
609 * <P>
610 * Invoking the <CODE>acknowledge</CODE> method of a received message from
611 * a closed connection's session must throw an
612 * <CODE>IllegalStateException</CODE>. Closing a closed connection must
613 * NOT throw an exception.
614 *
615 * @throws JMSException if the JMS provider fails to close the connection
616 * due to some internal error. For example, a failure to
617 * release resources or to close a socket connection can
618 * cause this exception to be thrown.
619 */
620 @Override
621 public void close() throws JMSException {
622 // Store the interrupted state and clear so that cleanup happens without
623 // leaking connection resources. Reset in finally to preserve state.
624 boolean interrupted = Thread.interrupted();
625
626 try {
627
628 // If we were running, lets stop first.
629 if (!closed.get() && !transportFailed.get()) {
630 stop();
631 }
632
633 synchronized (this) {
634 if (!closed.get()) {
635 closing.set(true);
636
637 if (destinationSource != null) {
638 destinationSource.stop();
639 destinationSource = null;
640 }
641 if (advisoryConsumer != null) {
642 advisoryConsumer.dispose();
643 advisoryConsumer = null;
644 }
645
646 Scheduler scheduler = this.scheduler;
647 if (scheduler != null) {
648 try {
649 scheduler.stop();
650 } catch (Exception e) {
651 JMSException ex = JMSExceptionSupport.create(e);
652 throw ex;
653 }
654 }
655
656 long lastDeliveredSequenceId = 0;
657 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
658 ActiveMQSession s = i.next();
659 s.dispose();
660 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
661 }
662 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
663 ActiveMQConnectionConsumer c = i.next();
664 c.dispose();
665 }
666 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
667 ActiveMQInputStream c = i.next();
668 c.dispose();
669 }
670 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
671 ActiveMQOutputStream c = i.next();
672 c.dispose();
673 }
674
675 this.activeTempDestinations.clear();
676
677 if (isConnectionInfoSentToBroker) {
678 // If we announced ourselves to the broker.. Try to let the broker
679 // know that the connection is being shutdown.
680 RemoveInfo removeCommand = info.createRemoveCommand();
681 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
682 doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
683 doAsyncSendPacket(new ShutdownInfo());
684 }
685
686 started.set(false);
687
688 // TODO if we move the TaskRunnerFactory to the connection
689 // factory
690 // then we may need to call
691 // factory.onConnectionClose(this);
692 if (sessionTaskRunner != null) {
693 sessionTaskRunner.shutdown();
694 }
695 closed.set(true);
696 closing.set(false);
697 }
698 }
699 } finally {
700 try {
701 if (executor != null) {
702 ThreadPoolUtils.shutdown(executor);
703 }
704 } catch (Throwable e) {
705 LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
706 }
707
708 ServiceSupport.dispose(this.transport);
709
710 factoryStats.removeConnection(this);
711 if (interrupted) {
712 Thread.currentThread().interrupt();
713 }
714 }
715 }
716
717 /**
718 * Tells the broker to terminate its VM. This can be used to cleanly
719 * terminate a broker running in a standalone java process. Server must have
720 * property enable.vm.shutdown=true defined to allow this to work.
721 */
722 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
723 // implemented.
724 /*
725 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
726 * command = new BrokerAdminCommand();
727 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
728 * asyncSendPacket(command); }
729 */
730
731 /**
732 * Create a durable connection consumer for this connection (optional
733 * operation). This is an expert facility not used by regular JMS clients.
734 *
735 * @param topic topic to access
736 * @param subscriptionName durable subscription name
737 * @param messageSelector only messages with properties matching the message
738 * selector expression are delivered. A value of null or an
739 * empty string indicates that there is no message selector
740 * for the message consumer.
741 * @param sessionPool the server session pool to associate with this durable
742 * connection consumer
743 * @param maxMessages the maximum number of messages that can be assigned to
744 * a server session at one time
745 * @return the durable connection consumer
746 * @throws JMSException if the <CODE>Connection</CODE> object fails to
747 * create a connection consumer due to some internal error
748 * or invalid arguments for <CODE>sessionPool</CODE> and
749 * <CODE>messageSelector</CODE>.
750 * @throws javax.jms.InvalidDestinationException if an invalid destination
751 * is specified.
752 * @throws javax.jms.InvalidSelectorException if the message selector is
753 * invalid.
754 * @see javax.jms.ConnectionConsumer
755 * @since 1.1
756 */
757 @Override
758 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
759 throws JMSException {
760 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
761 }
762
763 /**
764 * Create a durable connection consumer for this connection (optional
765 * operation). This is an expert facility not used by regular JMS clients.
766 *
767 * @param topic topic to access
768 * @param subscriptionName durable subscription name
769 * @param messageSelector only messages with properties matching the message
770 * selector expression are delivered. A value of null or an
771 * empty string indicates that there is no message selector
772 * for the message consumer.
773 * @param sessionPool the server session pool to associate with this durable
774 * connection consumer
775 * @param maxMessages the maximum number of messages that can be assigned to
776 * a server session at one time
777 * @param noLocal set true if you want to filter out messages published
778 * locally
779 * @return the durable connection consumer
780 * @throws JMSException if the <CODE>Connection</CODE> object fails to
781 * create a connection consumer due to some internal error
782 * or invalid arguments for <CODE>sessionPool</CODE> and
783 * <CODE>messageSelector</CODE>.
784 * @throws javax.jms.InvalidDestinationException if an invalid destination
785 * is specified.
786 * @throws javax.jms.InvalidSelectorException if the message selector is
787 * invalid.
788 * @see javax.jms.ConnectionConsumer
789 * @since 1.1
790 */
791 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
792 boolean noLocal) throws JMSException {
793 checkClosedOrFailed();
794
795 if (queueOnlyConnection) {
796 throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
797 }
798
799 ensureConnectionInfoSent();
800 SessionId sessionId = new SessionId(info.getConnectionId(), -1);
801 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
802 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
803 info.setSubscriptionName(subscriptionName);
804 info.setSelector(messageSelector);
805 info.setPrefetchSize(maxMessages);
806 info.setDispatchAsync(isDispatchAsync());
807
808 // Allows the options on the destination to configure the consumerInfo
809 if (info.getDestination().getOptions() != null) {
810 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
811 IntrospectionSupport.setProperties(this.info, options, "consumer.");
812 }
813
814 return new ActiveMQConnectionConsumer(this, sessionPool, info);
815 }
816
817 // Properties
818 // -------------------------------------------------------------------------
819
820 /**
821 * Returns true if this connection has been started
822 *
823 * @return true if this Connection is started
824 */
825 public boolean isStarted() {
826 return started.get();
827 }
828
829 /**
830 * Returns true if the connection is closed
831 */
832 public boolean isClosed() {
833 return closed.get();
834 }
835
836 /**
837 * Returns true if the connection is in the process of being closed
838 */
839 public boolean isClosing() {
840 return closing.get();
841 }
842
843 /**
844 * Returns true if the underlying transport has failed
845 */
846 public boolean isTransportFailed() {
847 return transportFailed.get();
848 }
849
850 /**
851 * @return Returns the prefetchPolicy.
852 */
853 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
854 return prefetchPolicy;
855 }
856
857 /**
858 * Sets the <a
859 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
860 * policy</a> for consumers created by this connection.
861 */
862 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
863 this.prefetchPolicy = prefetchPolicy;
864 }
865
866 /**
867 */
868 public Transport getTransportChannel() {
869 return transport;
870 }
871
872 /**
873 * @return Returns the clientID of the connection, forcing one to be
874 * generated if one has not yet been configured.
875 */
876 public String getInitializedClientID() throws JMSException {
877 ensureConnectionInfoSent();
878 return info.getClientId();
879 }
880
881 /**
882 * @return Returns the timeStampsDisableByDefault.
883 */
884 public boolean isDisableTimeStampsByDefault() {
885 return disableTimeStampsByDefault;
886 }
887
888 /**
889 * Sets whether or not timestamps on messages should be disabled or not. If
890 * you disable them it adds a small performance boost.
891 */
892 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
893 this.disableTimeStampsByDefault = timeStampsDisableByDefault;
894 }
895
896 /**
897 * @return Returns the dispatchOptimizedMessage.
898 */
899 public boolean isOptimizedMessageDispatch() {
900 return optimizedMessageDispatch;
901 }
902
903 /**
904 * If this flag is set then an larger prefetch limit is used - only
905 * applicable for durable topic subscribers.
906 */
907 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
908 this.optimizedMessageDispatch = dispatchOptimizedMessage;
909 }
910
911 /**
912 * @return Returns the closeTimeout.
913 */
914 public int getCloseTimeout() {
915 return closeTimeout;
916 }
917
918 /**
919 * Sets the timeout before a close is considered complete. Normally a
920 * close() on a connection waits for confirmation from the broker; this
921 * allows that operation to timeout to save the client hanging if there is
922 * no broker
923 */
924 public void setCloseTimeout(int closeTimeout) {
925 this.closeTimeout = closeTimeout;
926 }
927
928 /**
929 * @return ConnectionInfo
930 */
931 public ConnectionInfo getConnectionInfo() {
932 return this.info;
933 }
934
935 public boolean isUseRetroactiveConsumer() {
936 return useRetroactiveConsumer;
937 }
938
939 /**
940 * Sets whether or not retroactive consumers are enabled. Retroactive
941 * consumers allow non-durable topic subscribers to receive old messages
942 * that were published before the non-durable subscriber started.
943 */
944 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
945 this.useRetroactiveConsumer = useRetroactiveConsumer;
946 }
947
948 public boolean isNestedMapAndListEnabled() {
949 return nestedMapAndListEnabled;
950 }
951
952 /**
953 * Enables/disables whether or not Message properties and MapMessage entries
954 * support <a
955 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
956 * Structures</a> of Map and List objects
957 */
958 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
959 this.nestedMapAndListEnabled = structuredMapsEnabled;
960 }
961
962 public boolean isExclusiveConsumer() {
963 return exclusiveConsumer;
964 }
965
966 /**
967 * Enables or disables whether or not queue consumers should be exclusive or
968 * not for example to preserve ordering when not using <a
969 * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
970 *
971 * @param exclusiveConsumer
972 */
973 public void setExclusiveConsumer(boolean exclusiveConsumer) {
974 this.exclusiveConsumer = exclusiveConsumer;
975 }
976
977 /**
978 * Adds a transport listener so that a client can be notified of events in
979 * the underlying transport
980 */
981 public void addTransportListener(TransportListener transportListener) {
982 transportListeners.add(transportListener);
983 }
984
985 public void removeTransportListener(TransportListener transportListener) {
986 transportListeners.remove(transportListener);
987 }
988
989 public boolean isUseDedicatedTaskRunner() {
990 return useDedicatedTaskRunner;
991 }
992
993 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
994 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
995 }
996
997 public TaskRunnerFactory getSessionTaskRunner() {
998 synchronized (this) {
999 if (sessionTaskRunner == null) {
1000 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
1001 sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
1002 }
1003 }
1004 return sessionTaskRunner;
1005 }
1006
1007 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1008 this.sessionTaskRunner = sessionTaskRunner;
1009 }
1010
1011 public MessageTransformer getTransformer() {
1012 return transformer;
1013 }
1014
1015 /**
1016 * Sets the transformer used to transform messages before they are sent on
1017 * to the JMS bus or when they are received from the bus but before they are
1018 * delivered to the JMS client
1019 */
1020 public void setTransformer(MessageTransformer transformer) {
1021 this.transformer = transformer;
1022 }
1023
1024 /**
1025 * @return the statsEnabled
1026 */
1027 public boolean isStatsEnabled() {
1028 return this.stats.isEnabled();
1029 }
1030
1031 /**
1032 * @param statsEnabled the statsEnabled to set
1033 */
1034 public void setStatsEnabled(boolean statsEnabled) {
1035 this.stats.setEnabled(statsEnabled);
1036 }
1037
1038 /**
1039 * Returns the {@link DestinationSource} object which can be used to listen to destinations
1040 * being created or destroyed or to enquire about the current destinations available on the broker
1041 *
1042 * @return a lazily created destination source
1043 * @throws JMSException
1044 */
1045 @Override
1046 public DestinationSource getDestinationSource() throws JMSException {
1047 if (destinationSource == null) {
1048 destinationSource = new DestinationSource(this);
1049 destinationSource.start();
1050 }
1051 return destinationSource;
1052 }
1053
1054 // Implementation methods
1055 // -------------------------------------------------------------------------
1056
1057 /**
1058 * Used internally for adding Sessions to the Connection
1059 *
1060 * @param session
1061 * @throws JMSException
1062 * @throws JMSException
1063 */
1064 protected void addSession(ActiveMQSession session) throws JMSException {
1065 this.sessions.add(session);
1066 if (sessions.size() > 1 || session.isTransacted()) {
1067 optimizedMessageDispatch = false;
1068 }
1069 }
1070
1071 /**
1072 * Used interanlly for removing Sessions from a Connection
1073 *
1074 * @param session
1075 */
1076 protected void removeSession(ActiveMQSession session) {
1077 this.sessions.remove(session);
1078 this.removeDispatcher(session);
1079 }
1080
1081 /**
1082 * Add a ConnectionConsumer
1083 *
1084 * @param connectionConsumer
1085 * @throws JMSException
1086 */
1087 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1088 this.connectionConsumers.add(connectionConsumer);
1089 }
1090
1091 /**
1092 * Remove a ConnectionConsumer
1093 *
1094 * @param connectionConsumer
1095 */
1096 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1097 this.connectionConsumers.remove(connectionConsumer);
1098 this.removeDispatcher(connectionConsumer);
1099 }
1100
1101 /**
1102 * Creates a <CODE>TopicSession</CODE> object.
1103 *
1104 * @param transacted indicates whether the session is transacted
1105 * @param acknowledgeMode indicates whether the consumer or the client will
1106 * acknowledge any messages it receives; ignored if the
1107 * session is transacted. Legal values are
1108 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1109 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1110 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1111 * @return a newly created topic session
1112 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1113 * to create a session due to some internal error or lack of
1114 * support for the specific transaction and acknowledgement
1115 * mode.
1116 * @see Session#AUTO_ACKNOWLEDGE
1117 * @see Session#CLIENT_ACKNOWLEDGE
1118 * @see Session#DUPS_OK_ACKNOWLEDGE
1119 */
1120 @Override
1121 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1122 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1123 }
1124
1125 /**
1126 * Creates a connection consumer for this connection (optional operation).
1127 * This is an expert facility not used by regular JMS clients.
1128 *
1129 * @param topic the topic to access
1130 * @param messageSelector only messages with properties matching the message
1131 * selector expression are delivered. A value of null or an
1132 * empty string indicates that there is no message selector
1133 * for the message consumer.
1134 * @param sessionPool the server session pool to associate with this
1135 * connection consumer
1136 * @param maxMessages the maximum number of messages that can be assigned to
1137 * a server session at one time
1138 * @return the connection consumer
1139 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1140 * to create a connection consumer due to some internal
1141 * error or invalid arguments for <CODE>sessionPool</CODE>
1142 * and <CODE>messageSelector</CODE>.
1143 * @throws javax.jms.InvalidDestinationException if an invalid topic is
1144 * specified.
1145 * @throws javax.jms.InvalidSelectorException if the message selector is
1146 * invalid.
1147 * @see javax.jms.ConnectionConsumer
1148 */
1149 @Override
1150 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1151 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1152 }
1153
1154 /**
1155 * Creates a connection consumer for this connection (optional operation).
1156 * This is an expert facility not used by regular JMS clients.
1157 *
1158 * @param queue the queue to access
1159 * @param messageSelector only messages with properties matching the message
1160 * selector expression are delivered. A value of null or an
1161 * empty string indicates that there is no message selector
1162 * for the message consumer.
1163 * @param sessionPool the server session pool to associate with this
1164 * connection consumer
1165 * @param maxMessages the maximum number of messages that can be assigned to
1166 * a server session at one time
1167 * @return the connection consumer
1168 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1169 * to create a connection consumer due to some internal
1170 * error or invalid arguments for <CODE>sessionPool</CODE>
1171 * and <CODE>messageSelector</CODE>.
1172 * @throws javax.jms.InvalidDestinationException if an invalid queue is
1173 * specified.
1174 * @throws javax.jms.InvalidSelectorException if the message selector is
1175 * invalid.
1176 * @see javax.jms.ConnectionConsumer
1177 */
1178 @Override
1179 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1180 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1181 }
1182
1183 /**
1184 * Creates a connection consumer for this connection (optional operation).
1185 * This is an expert facility not used by regular JMS clients.
1186 *
1187 * @param destination the destination to access
1188 * @param messageSelector only messages with properties matching the message
1189 * selector expression are delivered. A value of null or an
1190 * empty string indicates that there is no message selector
1191 * for the message consumer.
1192 * @param sessionPool the server session pool to associate with this
1193 * connection consumer
1194 * @param maxMessages the maximum number of messages that can be assigned to
1195 * a server session at one time
1196 * @return the connection consumer
1197 * @throws JMSException if the <CODE>Connection</CODE> object fails to
1198 * create a connection consumer due to some internal error
1199 * or invalid arguments for <CODE>sessionPool</CODE> and
1200 * <CODE>messageSelector</CODE>.
1201 * @throws javax.jms.InvalidDestinationException if an invalid destination
1202 * is specified.
1203 * @throws javax.jms.InvalidSelectorException if the message selector is
1204 * invalid.
1205 * @see javax.jms.ConnectionConsumer
1206 * @since 1.1
1207 */
1208 @Override
1209 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1210 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1211 }
1212
1213 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1214 throws JMSException {
1215
1216 checkClosedOrFailed();
1217 ensureConnectionInfoSent();
1218
1219 ConsumerId consumerId = createConsumerId();
1220 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1221 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1222 consumerInfo.setSelector(messageSelector);
1223 consumerInfo.setPrefetchSize(maxMessages);
1224 consumerInfo.setNoLocal(noLocal);
1225 consumerInfo.setDispatchAsync(isDispatchAsync());
1226
1227 // Allows the options on the destination to configure the consumerInfo
1228 if (consumerInfo.getDestination().getOptions() != null) {
1229 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1230 IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1231 }
1232
1233 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1234 }
1235
1236 /**
1237 * @return
1238 */
1239 private ConsumerId createConsumerId() {
1240 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1241 }
1242
1243 /**
1244 * @return
1245 */
1246 private ProducerId createProducerId() {
1247 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1248 }
1249
1250 /**
1251 * Creates a <CODE>QueueSession</CODE> object.
1252 *
1253 * @param transacted indicates whether the session is transacted
1254 * @param acknowledgeMode indicates whether the consumer or the client will
1255 * acknowledge any messages it receives; ignored if the
1256 * session is transacted. Legal values are
1257 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1258 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1259 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1260 * @return a newly created queue session
1261 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1262 * to create a session due to some internal error or lack of
1263 * support for the specific transaction and acknowledgement
1264 * mode.
1265 * @see Session#AUTO_ACKNOWLEDGE
1266 * @see Session#CLIENT_ACKNOWLEDGE
1267 * @see Session#DUPS_OK_ACKNOWLEDGE
1268 */
1269 @Override
1270 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1271 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1272 }
1273
1274 /**
1275 * Ensures that the clientID was manually specified and not auto-generated.
1276 * If the clientID was not specified this method will throw an exception.
1277 * This method is used to ensure that the clientID + durableSubscriber name
1278 * are used correctly.
1279 *
1280 * @throws JMSException
1281 */
1282 public void checkClientIDWasManuallySpecified() throws JMSException {
1283 if (!userSpecifiedClientID) {
1284 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1285 }
1286 }
1287
1288 /**
1289 * send a Packet through the Connection - for internal use only
1290 *
1291 * @param command
1292 * @throws JMSException
1293 */
1294 public void asyncSendPacket(Command command) throws JMSException {
1295 if (isClosed()) {
1296 throw new ConnectionClosedException();
1297 } else {
1298 doAsyncSendPacket(command);
1299 }
1300 }
1301
1302 private void doAsyncSendPacket(Command command) throws JMSException {
1303 try {
1304 this.transport.oneway(command);
1305 } catch (IOException e) {
1306 throw JMSExceptionSupport.create(e);
1307 }
1308 }
1309
1310 /**
1311 * Send a packet through a Connection - for internal use only
1312 *
1313 * @param command
1314 * @return
1315 * @throws JMSException
1316 */
1317 public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1318 if(onComplete==null) {
1319 syncSendPacket(command);
1320 } else {
1321 if (isClosed()) {
1322 throw new ConnectionClosedException();
1323 }
1324 try {
1325 this.transport.asyncRequest(command, new ResponseCallback() {
1326 @Override
1327 public void onCompletion(FutureResponse resp) {
1328 Response response;
1329 Throwable exception = null;
1330 try {
1331 response = resp.getResult();
1332 if (response.isException()) {
1333 ExceptionResponse er = (ExceptionResponse)response;
1334 exception = er.getException();
1335 }
1336 } catch (Exception e) {
1337 exception = e;
1338 }
1339 if(exception!=null) {
1340 if ( exception instanceof JMSException) {
1341 onComplete.onException((JMSException) exception);
1342 } else {
1343 if (isClosed()||closing.get()) {
1344 LOG.debug("Received an exception but connection is closing");
1345 }
1346 JMSException jmsEx = null;
1347 try {
1348 jmsEx = JMSExceptionSupport.create(exception);
1349 } catch(Throwable e) {
1350 LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1351 }
1352 // dispose of transport for security exceptions on connection initiation
1353 if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1354 Transport t = transport;
1355 if (null != t){
1356 ServiceSupport.dispose(t);
1357 }
1358 }
1359 if (jmsEx !=null) {
1360 onComplete.onException(jmsEx);
1361 }
1362 }
1363 } else {
1364 onComplete.onSuccess();
1365 }
1366 }
1367 });
1368 } catch (IOException e) {
1369 throw JMSExceptionSupport.create(e);
1370 }
1371 }
1372 }
1373
1374 public Response syncSendPacket(Command command) throws JMSException {
1375 if (isClosed()) {
1376 throw new ConnectionClosedException();
1377 } else {
1378
1379 try {
1380 Response response = (Response)this.transport.request(command);
1381 if (response.isException()) {
1382 ExceptionResponse er = (ExceptionResponse)response;
1383 if (er.getException() instanceof JMSException) {
1384 throw (JMSException)er.getException();
1385 } else {
1386 if (isClosed()||closing.get()) {
1387 LOG.debug("Received an exception but connection is closing");
1388 }
1389 JMSException jmsEx = null;
1390 try {
1391 jmsEx = JMSExceptionSupport.create(er.getException());
1392 } catch(Throwable e) {
1393 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1394 }
1395 //dispose of transport for security exceptions
1396 if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1397 Transport t = this.transport;
1398 if (null != t){
1399 ServiceSupport.dispose(t);
1400 }
1401 }
1402 if (jmsEx !=null) {
1403 throw jmsEx;
1404 }
1405 }
1406 }
1407 return response;
1408 } catch (IOException e) {
1409 throw JMSExceptionSupport.create(e);
1410 }
1411 }
1412 }
1413
1414 /**
1415 * Send a packet through a Connection - for internal use only
1416 *
1417 * @param command
1418 * @return
1419 * @throws JMSException
1420 */
1421 public Response syncSendPacket(Command command, int timeout) throws JMSException {
1422 if (isClosed() || closing.get()) {
1423 throw new ConnectionClosedException();
1424 } else {
1425 return doSyncSendPacket(command, timeout);
1426 }
1427 }
1428
1429 private Response doSyncSendPacket(Command command, int timeout)
1430 throws JMSException {
1431 try {
1432 Response response = (Response) (timeout > 0
1433 ? this.transport.request(command, timeout)
1434 : this.transport.request(command));
1435 if (response != null && response.isException()) {
1436 ExceptionResponse er = (ExceptionResponse)response;
1437 if (er.getException() instanceof JMSException) {
1438 throw (JMSException)er.getException();
1439 } else {
1440 throw JMSExceptionSupport.create(er.getException());
1441 }
1442 }
1443 return response;
1444 } catch (IOException e) {
1445 throw JMSExceptionSupport.create(e);
1446 }
1447 }
1448
1449 /**
1450 * @return statistics for this Connection
1451 */
1452 @Override
1453 public StatsImpl getStats() {
1454 return stats;
1455 }
1456
1457 /**
1458 * simply throws an exception if the Connection is already closed or the
1459 * Transport has failed
1460 *
1461 * @throws JMSException
1462 */
1463 protected synchronized void checkClosedOrFailed() throws JMSException {
1464 checkClosed();
1465 if (transportFailed.get()) {
1466 throw new ConnectionFailedException(firstFailureError);
1467 }
1468 }
1469
1470 /**
1471 * simply throws an exception if the Connection is already closed
1472 *
1473 * @throws JMSException
1474 */
1475 protected synchronized void checkClosed() throws JMSException {
1476 if (closed.get()) {
1477 throw new ConnectionClosedException();
1478 }
1479 }
1480
1481 /**
1482 * Send the ConnectionInfo to the Broker
1483 *
1484 * @throws JMSException
1485 */
1486 protected void ensureConnectionInfoSent() throws JMSException {
1487 synchronized(this.ensureConnectionInfoSentMutex) {
1488 // Can we skip sending the ConnectionInfo packet??
1489 if (isConnectionInfoSentToBroker || closed.get()) {
1490 return;
1491 }
1492 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1493 if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1494 info.setClientId(clientIdGenerator.generateId());
1495 }
1496 syncSendPacket(info.copy());
1497
1498 this.isConnectionInfoSentToBroker = true;
1499 // Add a temp destination advisory consumer so that
1500 // We know what the valid temporary destinations are on the
1501 // broker without having to do an RPC to the broker.
1502
1503 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1504 if (watchTopicAdvisories) {
1505 advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1506 }
1507 }
1508 }
1509
1510 public synchronized boolean isWatchTopicAdvisories() {
1511 return watchTopicAdvisories;
1512 }
1513
1514 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1515 this.watchTopicAdvisories = watchTopicAdvisories;
1516 }
1517
1518 /**
1519 * @return Returns the useAsyncSend.
1520 */
1521 public boolean isUseAsyncSend() {
1522 return useAsyncSend;
1523 }
1524
1525 /**
1526 * Forces the use of <a
1527 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1528 * adds a massive performance boost; but means that the send() method will
1529 * return immediately whether the message has been sent or not which could
1530 * lead to message loss.
1531 */
1532 public void setUseAsyncSend(boolean useAsyncSend) {
1533 this.useAsyncSend = useAsyncSend;
1534 }
1535
1536 /**
1537 * @return true if always sync send messages
1538 */
1539 public boolean isAlwaysSyncSend() {
1540 return this.alwaysSyncSend;
1541 }
1542
1543 /**
1544 * Set true if always require messages to be sync sent
1545 *
1546 * @param alwaysSyncSend
1547 */
1548 public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1549 this.alwaysSyncSend = alwaysSyncSend;
1550 }
1551
1552 /**
1553 * @return the messagePrioritySupported
1554 */
1555 public boolean isMessagePrioritySupported() {
1556 return this.messagePrioritySupported;
1557 }
1558
1559 /**
1560 * @param messagePrioritySupported the messagePrioritySupported to set
1561 */
1562 public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1563 this.messagePrioritySupported = messagePrioritySupported;
1564 }
1565
1566 /**
1567 * Cleans up this connection so that it's state is as if the connection was
1568 * just created. This allows the Resource Adapter to clean up a connection
1569 * so that it can be reused without having to close and recreate the
1570 * connection.
1571 */
1572 public void cleanup() throws JMSException {
1573
1574 if (advisoryConsumer != null && !isTransportFailed()) {
1575 advisoryConsumer.dispose();
1576 advisoryConsumer = null;
1577 }
1578
1579 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1580 ActiveMQSession s = i.next();
1581 s.dispose();
1582 }
1583 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1584 ActiveMQConnectionConsumer c = i.next();
1585 c.dispose();
1586 }
1587 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1588 ActiveMQInputStream c = i.next();
1589 c.dispose();
1590 }
1591 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1592 ActiveMQOutputStream c = i.next();
1593 c.dispose();
1594 }
1595
1596 if (isConnectionInfoSentToBroker) {
1597 if (!transportFailed.get() && !closing.get()) {
1598 syncSendPacket(info.createRemoveCommand());
1599 }
1600 isConnectionInfoSentToBroker = false;
1601 }
1602 if (userSpecifiedClientID) {
1603 info.setClientId(null);
1604 userSpecifiedClientID = false;
1605 }
1606 clientIDSet = false;
1607
1608 started.set(false);
1609 }
1610
1611 @Override
1612 public void finalize() throws Throwable{
1613 Scheduler s = this.scheduler;
1614 if (s != null){
1615 s.stop();
1616 }
1617 }
1618
1619 /**
1620 * Changes the associated username/password that is associated with this
1621 * connection. If the connection has been used, you must called cleanup()
1622 * before calling this method.
1623 *
1624 * @throws IllegalStateException if the connection is in used.
1625 */
1626 public void changeUserInfo(String userName, String password) throws JMSException {
1627 if (isConnectionInfoSentToBroker) {
1628 throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1629 }
1630 this.info.setUserName(userName);
1631 this.info.setPassword(password);
1632 }
1633
1634 /**
1635 * @return Returns the resourceManagerId.
1636 * @throws JMSException
1637 */
1638 public String getResourceManagerId() throws JMSException {
1639 waitForBrokerInfo();
1640 if (brokerInfo == null) {
1641 throw new JMSException("Connection failed before Broker info was received.");
1642 }
1643 return brokerInfo.getBrokerId().getValue();
1644 }
1645
1646 /**
1647 * Returns the broker name if one is available or null if one is not
1648 * available yet.
1649 */
1650 public String getBrokerName() {
1651 try {
1652 brokerInfoReceived.await(5, TimeUnit.SECONDS);
1653 if (brokerInfo == null) {
1654 return null;
1655 }
1656 return brokerInfo.getBrokerName();
1657 } catch (InterruptedException e) {
1658 Thread.currentThread().interrupt();
1659 return null;
1660 }
1661 }
1662
1663 /**
1664 * Returns the broker information if it is available or null if it is not
1665 * available yet.
1666 */
1667 public BrokerInfo getBrokerInfo() {
1668 return brokerInfo;
1669 }
1670
1671 /**
1672 * @return Returns the RedeliveryPolicy.
1673 * @throws JMSException
1674 */
1675 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1676 return redeliveryPolicyMap.getDefaultEntry();
1677 }
1678
1679 /**
1680 * Sets the redelivery policy to be used when messages are rolled back
1681 */
1682 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1683 this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1684 }
1685
1686 public BlobTransferPolicy getBlobTransferPolicy() {
1687 if (blobTransferPolicy == null) {
1688 blobTransferPolicy = createBlobTransferPolicy();
1689 }
1690 return blobTransferPolicy;
1691 }
1692
1693 /**
1694 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1695 * OBjects) are transferred from producers to brokers to consumers
1696 */
1697 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1698 this.blobTransferPolicy = blobTransferPolicy;
1699 }
1700
1701 /**
1702 * @return Returns the alwaysSessionAsync.
1703 */
1704 public boolean isAlwaysSessionAsync() {
1705 return alwaysSessionAsync;
1706 }
1707
1708 /**
1709 * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1710 * the Connection. However, a separate thread is always used if there is more than one session, or the session
1711 * isn't in auto acknowledge or duplicates ok mode. By default this value is set to true and session dispatch
1712 * happens asynchronously.
1713 */
1714 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1715 this.alwaysSessionAsync = alwaysSessionAsync;
1716 }
1717
1718 /**
1719 * @return Returns the optimizeAcknowledge.
1720 */
1721 public boolean isOptimizeAcknowledge() {
1722 return optimizeAcknowledge;
1723 }
1724
1725 /**
1726 * Enables an optimised acknowledgement mode where messages are acknowledged
1727 * in batches rather than individually
1728 *
1729 * @param optimizeAcknowledge The optimizeAcknowledge to set.
1730 */
1731 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1732 this.optimizeAcknowledge = optimizeAcknowledge;
1733 }
1734
1735 /**
1736 * The max time in milliseconds between optimized ack batches
1737 * @param optimizeAcknowledgeTimeOut
1738 */
1739 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1740 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut;
1741 }
1742
1743 public long getOptimizeAcknowledgeTimeOut() {
1744 return optimizeAcknowledgeTimeOut;
1745 }
1746
1747 public long getWarnAboutUnstartedConnectionTimeout() {
1748 return warnAboutUnstartedConnectionTimeout;
1749 }
1750
1751 /**
1752 * Enables the timeout from a connection creation to when a warning is
1753 * generated if the connection is not properly started via {@link #start()}
1754 * and a message is received by a consumer. It is a very common gotcha to
1755 * forget to <a
1756 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1757 * the connection</a> so this option makes the default case to create a
1758 * warning if the user forgets. To disable the warning just set the value to <
1759 * 0 (say -1).
1760 */
1761 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1762 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1763 }
1764
1765 /**
1766 * @return the sendTimeout
1767 */
1768 public int getSendTimeout() {
1769 return sendTimeout;
1770 }
1771
1772 /**
1773 * @param sendTimeout the sendTimeout to set
1774 */
1775 public void setSendTimeout(int sendTimeout) {
1776 this.sendTimeout = sendTimeout;
1777 }
1778
1779 /**
1780 * @return the sendAcksAsync
1781 */
1782 public boolean isSendAcksAsync() {
1783 return sendAcksAsync;
1784 }
1785
1786 /**
1787 * @param sendAcksAsync the sendAcksAsync to set
1788 */
1789 public void setSendAcksAsync(boolean sendAcksAsync) {
1790 this.sendAcksAsync = sendAcksAsync;
1791 }
1792
1793 /**
1794 * Returns the time this connection was created
1795 */
1796 public long getTimeCreated() {
1797 return timeCreated;
1798 }
1799
1800 private void waitForBrokerInfo() throws JMSException {
1801 try {
1802 brokerInfoReceived.await();
1803 } catch (InterruptedException e) {
1804 Thread.currentThread().interrupt();
1805 throw JMSExceptionSupport.create(e);
1806 }
1807 }
1808
1809 // Package protected so that it can be used in unit tests
1810 public Transport getTransport() {
1811 return transport;
1812 }
1813
1814 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1815 producers.put(producerId, producer);
1816 }
1817
1818 public void removeProducer(ProducerId producerId) {
1819 producers.remove(producerId);
1820 }
1821
1822 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1823 dispatchers.put(consumerId, dispatcher);
1824 }
1825
1826 public void removeDispatcher(ConsumerId consumerId) {
1827 dispatchers.remove(consumerId);
1828 }
1829
1830 /**
1831 * @param o - the command to consume
1832 */
1833 @Override
1834 public void onCommand(final Object o) {
1835 final Command command = (Command)o;
1836 if (!closed.get() && command != null) {
1837 try {
1838 command.visit(new CommandVisitorAdapter() {
1839 @Override
1840 public Response processMessageDispatch(MessageDispatch md) throws Exception {
1841 waitForTransportInterruptionProcessingToComplete();
1842 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1843 if (dispatcher != null) {
1844 // Copy in case a embedded broker is dispatching via
1845 // vm://
1846 // md.getMessage() == null to signal end of queue
1847 // browse.
1848 Message msg = md.getMessage();
1849 if (msg != null) {
1850 msg = msg.copy();
1851 msg.setReadOnlyBody(true);
1852 msg.setReadOnlyProperties(true);
1853 msg.setRedeliveryCounter(md.getRedeliveryCounter());
1854 msg.setConnection(ActiveMQConnection.this);
1855 msg.setMemoryUsage(null);
1856 md.setMessage(msg);
1857 }
1858 dispatcher.dispatch(md);
1859 }
1860 return null;
1861 }
1862
1863 @Override
1864 public Response processProducerAck(ProducerAck pa) throws Exception {
1865 if (pa != null && pa.getProducerId() != null) {
1866 ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1867 if (producer != null) {
1868 producer.onProducerAck(pa);
1869 }
1870 }
1871 return null;
1872 }
1873
1874 @Override
1875 public Response processBrokerInfo(BrokerInfo info) throws Exception {
1876 brokerInfo = info;
1877 brokerInfoReceived.countDown();
1878 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1879 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1880 return null;
1881 }
1882
1883 @Override
1884 public Response processConnectionError(final ConnectionError error) throws Exception {
1885 executor.execute(new Runnable() {
1886 @Override
1887 public void run() {
1888 onAsyncException(error.getException());
1889 }
1890 });
1891 return null;
1892 }
1893
1894 @Override
1895 public Response processControlCommand(ControlCommand command) throws Exception {
1896 onControlCommand(command);
1897 return null;
1898 }
1899
1900 @Override
1901 public Response processConnectionControl(ConnectionControl control) throws Exception {
1902 onConnectionControl((ConnectionControl)command);
1903 return null;
1904 }
1905
1906 @Override
1907 public Response processConsumerControl(ConsumerControl control) throws Exception {
1908 onConsumerControl((ConsumerControl)command);
1909 return null;
1910 }
1911
1912 @Override
1913 public Response processWireFormat(WireFormatInfo info) throws Exception {
1914 onWireFormatInfo((WireFormatInfo)command);
1915 return null;
1916 }
1917 });
1918 } catch (Exception e) {
1919 onClientInternalException(e);
1920 }
1921 }
1922
1923 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1924 TransportListener listener = iter.next();
1925 listener.onCommand(command);
1926 }
1927 }
1928
1929 protected void onWireFormatInfo(WireFormatInfo info) {
1930 protocolVersion.set(info.getVersion());
1931 }
1932
1933 /**
1934 * Handles async client internal exceptions.
1935 * A client internal exception is usually one that has been thrown
1936 * by a container runtime component during asynchronous processing of a
1937 * message that does not affect the connection itself.
1938 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1939 * its <code>onException</code> method, if one has been registered with this connection.
1940 *
1941 * @param error the exception that the problem
1942 */
1943 public void onClientInternalException(final Throwable error) {
1944 if ( !closed.get() && !closing.get() ) {
1945 if ( this.clientInternalExceptionListener != null ) {
1946 executor.execute(new Runnable() {
1947 @Override
1948 public void run() {
1949 ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1950 }
1951 });
1952 } else {
1953 LOG.debug("Async client internal exception occurred with no exception listener registered: "
1954 + error, error);
1955 }
1956 }
1957 }
1958
1959 /**
1960 * Used for handling async exceptions
1961 *
1962 * @param error
1963 */
1964 public void onAsyncException(Throwable error) {
1965 if (!closed.get() && !closing.get()) {
1966 if (this.exceptionListener != null) {
1967
1968 if (!(error instanceof JMSException)) {
1969 error = JMSExceptionSupport.create(error);
1970 }
1971 final JMSException e = (JMSException)error;
1972
1973 executor.execute(new Runnable() {
1974 @Override
1975 public void run() {
1976 ActiveMQConnection.this.exceptionListener.onException(e);
1977 }
1978 });
1979
1980 } else {
1981 LOG.debug("Async exception with no exception listener: " + error, error);
1982 }
1983 }
1984 }
1985
1986 @Override
1987 public void onException(final IOException error) {
1988 onAsyncException(error);
1989 if (!closing.get() && !closed.get()) {
1990 executor.execute(new Runnable() {
1991 @Override
1992 public void run() {
1993 transportFailed(error);
1994 ServiceSupport.dispose(ActiveMQConnection.this.transport);
1995 brokerInfoReceived.countDown();
1996 try {
1997 cleanup();
1998 } catch (JMSException e) {
1999 LOG.warn("Exception during connection cleanup, " + e, e);
2000 }
2001 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2002 TransportListener listener = iter.next();
2003 listener.onException(error);
2004 }
2005 }
2006 });
2007 }
2008 }
2009
2010 @Override
2011 public void transportInterupted() {
2012 this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
2013 if (LOG.isDebugEnabled()) {
2014 LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
2015 }
2016 signalInterruptionProcessingNeeded();
2017
2018 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2019 ActiveMQSession s = i.next();
2020 s.clearMessagesInProgress();
2021 }
2022
2023 for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
2024 connectionConsumer.clearMessagesInProgress();
2025 }
2026
2027 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2028 TransportListener listener = iter.next();
2029 listener.transportInterupted();
2030 }
2031 }
2032
2033 @Override
2034 public void transportResumed() {
2035 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2036 TransportListener listener = iter.next();
2037 listener.transportResumed();
2038 }
2039 }
2040
2041 /**
2042 * Create the DestinationInfo object for the temporary destination.
2043 *
2044 * @param topic - if its true topic, else queue.
2045 * @return DestinationInfo
2046 * @throws JMSException
2047 */
2048 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2049
2050 // Check if Destination info is of temporary type.
2051 ActiveMQTempDestination dest;
2052 if (topic) {
2053 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2054 } else {
2055 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2056 }
2057
2058 DestinationInfo info = new DestinationInfo();
2059 info.setConnectionId(this.info.getConnectionId());
2060 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2061 info.setDestination(dest);
2062 syncSendPacket(info);
2063
2064 dest.setConnection(this);
2065 activeTempDestinations.put(dest, dest);
2066 return dest;
2067 }
2068
2069 /**
2070 * @param destination
2071 * @throws JMSException
2072 */
2073 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2074
2075 checkClosedOrFailed();
2076
2077 for (ActiveMQSession session : this.sessions) {
2078 if (session.isInUse(destination)) {
2079 throw new JMSException("A consumer is consuming from the temporary destination");
2080 }
2081 }
2082
2083 activeTempDestinations.remove(destination);
2084
2085 DestinationInfo destInfo = new DestinationInfo();
2086 destInfo.setConnectionId(this.info.getConnectionId());
2087 destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2088 destInfo.setDestination(destination);
2089 destInfo.setTimeout(0);
2090 syncSendPacket(destInfo);
2091 }
2092
2093 public boolean isDeleted(ActiveMQDestination dest) {
2094
2095 // If we are not watching the advisories.. then
2096 // we will assume that the temp destination does exist.
2097 if (advisoryConsumer == null) {
2098 return false;
2099 }
2100
2101 return !activeTempDestinations.contains(dest);
2102 }
2103
2104 public boolean isCopyMessageOnSend() {
2105 return copyMessageOnSend;
2106 }
2107
2108 public LongSequenceGenerator getLocalTransactionIdGenerator() {
2109 return localTransactionIdGenerator;
2110 }
2111
2112 public boolean isUseCompression() {
2113 return useCompression;
2114 }
2115
2116 /**
2117 * Enables the use of compression of the message bodies
2118 */
2119 public void setUseCompression(boolean useCompression) {
2120 this.useCompression = useCompression;
2121 }
2122
2123 public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2124
2125 checkClosedOrFailed();
2126 ensureConnectionInfoSent();
2127
2128 DestinationInfo info = new DestinationInfo();
2129 info.setConnectionId(this.info.getConnectionId());
2130 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2131 info.setDestination(destination);
2132 info.setTimeout(0);
2133 syncSendPacket(info);
2134 }
2135
2136 public boolean isDispatchAsync() {
2137 return dispatchAsync;
2138 }
2139
2140 /**
2141 * Enables or disables the default setting of whether or not consumers have
2142 * their messages <a
2143 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2144 * synchronously or asynchronously by the broker</a>. For non-durable
2145 * topics for example we typically dispatch synchronously by default to
2146 * minimize context switches which boost performance. However sometimes its
2147 * better to go slower to ensure that a single blocked consumer socket does
2148 * not block delivery to other consumers.
2149 *
2150 * @param asyncDispatch If true then consumers created on this connection
2151 * will default to having their messages dispatched
2152 * asynchronously. The default value is true.
2153 */
2154 public void setDispatchAsync(boolean asyncDispatch) {
2155 this.dispatchAsync = asyncDispatch;
2156 }
2157
2158 public boolean isObjectMessageSerializationDefered() {
2159 return objectMessageSerializationDefered;
2160 }
2161
2162 /**
2163 * When an object is set on an ObjectMessage, the JMS spec requires the
2164 * object to be serialized by that set method. Enabling this flag causes the
2165 * object to not get serialized. The object may subsequently get serialized
2166 * if the message needs to be sent over a socket or stored to disk.
2167 */
2168 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2169 this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2170 }
2171
2172 @Override
2173 public InputStream createInputStream(Destination dest) throws JMSException {
2174 return createInputStream(dest, null);
2175 }
2176
2177 @Override
2178 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2179 return createInputStream(dest, messageSelector, false);
2180 }
2181
2182 @Override
2183 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2184 return createInputStream(dest, messageSelector, noLocal, -1);
2185 }
2186
2187 @Override
2188 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2189 return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2190 }
2191
2192 @Override
2193 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2194 return createInputStream(dest, null, false);
2195 }
2196
2197 @Override
2198 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2199 return createDurableInputStream(dest, name, messageSelector, false);
2200 }
2201
2202 @Override
2203 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2204 return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2205 }
2206
2207 @Override
2208 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2209 return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2210 }
2211
2212 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2213 checkClosedOrFailed();
2214 ensureConnectionInfoSent();
2215 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2216 }
2217
2218 /**
2219 * Creates a persistent output stream; individual messages will be written
2220 * to disk/database by the broker
2221 */
2222 @Override
2223 public OutputStream createOutputStream(Destination dest) throws JMSException {
2224 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2225 }
2226
2227 /**
2228 * Creates a non persistent output stream; messages will not be written to
2229 * disk
2230 */
2231 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2232 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2233 }
2234
2235 /**
2236 * Creates an output stream allowing full control over the delivery mode,
2237 * the priority and time to live of the messages and the properties added to
2238 * messages on the stream.
2239 *
2240 * @param streamProperties defines a map of key-value pairs where the keys
2241 * are strings and the values are primitive values (numbers
2242 * and strings) which are appended to the messages similarly
2243 * to using the
2244 * {@link javax.jms.Message#setObjectProperty(String, Object)}
2245 * method
2246 */
2247 @Override
2248 public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2249 checkClosedOrFailed();
2250 ensureConnectionInfoSent();
2251 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2252 }
2253
2254 /**
2255 * Unsubscribes a durable subscription that has been created by a client.
2256 * <P>
2257 * This method deletes the state being maintained on behalf of the
2258 * subscriber by its provider.
2259 * <P>
2260 * It is erroneous for a client to delete a durable subscription while there
2261 * is an active <CODE>MessageConsumer </CODE> or
2262 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2263 * message is part of a pending transaction or has not been acknowledged in
2264 * the session.
2265 *
2266 * @param name the name used to identify this subscription
2267 * @throws JMSException if the session fails to unsubscribe to the durable
2268 * subscription due to some internal error.
2269 * @throws InvalidDestinationException if an invalid subscription name is
2270 * specified.
2271 * @since 1.1
2272 */
2273 @Override
2274 public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2275 checkClosedOrFailed();
2276 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2277 rsi.setConnectionId(getConnectionInfo().getConnectionId());
2278 rsi.setSubscriptionName(name);
2279 rsi.setClientId(getConnectionInfo().getClientId());
2280 syncSendPacket(rsi);
2281 }
2282
2283 /**
2284 * Internal send method optimized: - It does not copy the message - It can
2285 * only handle ActiveMQ messages. - You can specify if the send is async or
2286 * sync - Does not allow you to send /w a transaction.
2287 */
2288 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2289 checkClosedOrFailed();
2290
2291 if (destination.isTemporary() && isDeleted(destination)) {
2292 throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2293 }
2294
2295 msg.setJMSDestination(destination);
2296 msg.setJMSDeliveryMode(deliveryMode);
2297 long expiration = 0L;
2298
2299 if (!isDisableTimeStampsByDefault()) {
2300 long timeStamp = System.currentTimeMillis();
2301 msg.setJMSTimestamp(timeStamp);
2302 if (timeToLive > 0) {
2303 expiration = timeToLive + timeStamp;
2304 }
2305 }
2306
2307 msg.setJMSExpiration(expiration);
2308 msg.setJMSPriority(priority);
2309 msg.setJMSRedelivered(false);
2310 msg.setMessageId(messageId);
2311 msg.onSend();
2312 msg.setProducerId(msg.getMessageId().getProducerId());
2313
2314 if (LOG.isDebugEnabled()) {
2315 LOG.debug("Sending message: " + msg);
2316 }
2317
2318 if (async) {
2319 asyncSendPacket(msg);
2320 } else {
2321 syncSendPacket(msg);
2322 }
2323 }
2324
2325 public void addOutputStream(ActiveMQOutputStream stream) {
2326 outputStreams.add(stream);
2327 }
2328
2329 public void removeOutputStream(ActiveMQOutputStream stream) {
2330 outputStreams.remove(stream);
2331 }
2332
2333 public void addInputStream(ActiveMQInputStream stream) {
2334 inputStreams.add(stream);
2335 }
2336
2337 public void removeInputStream(ActiveMQInputStream stream) {
2338 inputStreams.remove(stream);
2339 }
2340
2341 protected void onControlCommand(ControlCommand command) {
2342 String text = command.getCommand();
2343 if (text != null) {
2344 if ("shutdown".equals(text)) {
2345 LOG.info("JVM told to shutdown");
2346 System.exit(0);
2347 }
2348
2349 // TODO Should we handle the "close" case?
2350 // if (false && "close".equals(text)){
2351 // LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2352 // try {
2353 // close();
2354 // } catch (JMSException e) {
2355 // }
2356 // }
2357 }
2358 }
2359
2360 protected void onConnectionControl(ConnectionControl command) {
2361 if (command.isFaultTolerant()) {
2362 this.optimizeAcknowledge = false;
2363 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2364 ActiveMQSession s = i.next();
2365 s.setOptimizeAcknowledge(false);
2366 }
2367 }
2368 }
2369
2370 protected void onConsumerControl(ConsumerControl command) {
2371 if (command.isClose()) {
2372 for (ActiveMQSession session : this.sessions) {
2373 session.close(command.getConsumerId());
2374 }
2375 } else {
2376 for (ActiveMQSession session : this.sessions) {
2377 session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2378 }
2379 for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
2380 ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
2381 if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
2382 consumerInfo.setPrefetchSize(command.getPrefetch());
2383 }
2384 }
2385 }
2386 }
2387
2388 protected void transportFailed(IOException error) {
2389 transportFailed.set(true);
2390 if (firstFailureError == null) {
2391 firstFailureError = error;
2392 }
2393 }
2394
2395 /**
2396 * Should a JMS message be copied to a new JMS Message object as part of the
2397 * send() method in JMS. This is enabled by default to be compliant with the
2398 * JMS specification. You can disable it if you do not mutate JMS messages
2399 * after they are sent for a performance boost
2400 */
2401 public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2402 this.copyMessageOnSend = copyMessageOnSend;
2403 }
2404
2405 @Override
2406 public String toString() {
2407 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2408 }
2409
2410 protected BlobTransferPolicy createBlobTransferPolicy() {
2411 return new BlobTransferPolicy();
2412 }
2413
2414 public int getProtocolVersion() {
2415 return protocolVersion.get();
2416 }
2417
2418 public int getProducerWindowSize() {
2419 return producerWindowSize;
2420 }
2421
2422 public void setProducerWindowSize(int producerWindowSize) {
2423 this.producerWindowSize = producerWindowSize;
2424 }
2425
2426 public void setAuditDepth(int auditDepth) {
2427 connectionAudit.setAuditDepth(auditDepth);
2428 }
2429
2430 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2431 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2432 }
2433
2434 protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2435 connectionAudit.removeDispatcher(dispatcher);
2436 }
2437
2438 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2439 return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2440 }
2441
2442 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2443 connectionAudit.rollbackDuplicate(dispatcher, message);
2444 }
2445
2446 public IOException getFirstFailureError() {
2447 return firstFailureError;
2448 }
2449
2450 protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2451 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2452 if (cdl != null) {
2453 if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2454 LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2455 cdl.await(10, TimeUnit.SECONDS);
2456 }
2457 signalInterruptionProcessingComplete();
2458 }
2459 }
2460
2461 protected void transportInterruptionProcessingComplete() {
2462 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2463 if (cdl != null) {
2464 cdl.countDown();
2465 try {
2466 signalInterruptionProcessingComplete();
2467 } catch (InterruptedException ignored) {}
2468 }
2469 }
2470
2471 private void signalInterruptionProcessingComplete() throws InterruptedException {
2472 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2473 if (cdl.getCount()==0) {
2474 if (LOG.isDebugEnabled()) {
2475 LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2476 }
2477 this.transportInterruptionProcessingComplete = null;
2478
2479 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2480 if (failoverTransport != null) {
2481 failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2482 if (LOG.isDebugEnabled()) {
2483 LOG.debug("notified failover transport (" + failoverTransport
2484 + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2485 }
2486 }
2487
2488 }
2489 }
2490
2491 private void signalInterruptionProcessingNeeded() {
2492 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2493 if (failoverTransport != null) {
2494 failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2495 if (LOG.isDebugEnabled()) {
2496 LOG.debug("notified failover transport (" + failoverTransport
2497 + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2498 }
2499 }
2500 }
2501
2502 /*
2503 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2504 * will wait to receive re dispatched messages.
2505 * default value is 0 so there is no wait by default.
2506 */
2507 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2508 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2509 }
2510
2511 public long getConsumerFailoverRedeliveryWaitPeriod() {
2512 return consumerFailoverRedeliveryWaitPeriod;
2513 }
2514
2515 protected Scheduler getScheduler() throws JMSException {
2516 Scheduler result = scheduler;
2517 if (result == null) {
2518 synchronized (this) {
2519 result = scheduler;
2520 if (result == null) {
2521 checkClosed();
2522 try {
2523 result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2524 scheduler.start();
2525 } catch(Exception e) {
2526 throw JMSExceptionSupport.create(e);
2527 }
2528 }
2529 }
2530 }
2531 return result;
2532 }
2533
2534 protected ThreadPoolExecutor getExecutor() {
2535 return this.executor;
2536 }
2537
2538 /**
2539 * @return the checkForDuplicates
2540 */
2541 public boolean isCheckForDuplicates() {
2542 return this.checkForDuplicates;
2543 }
2544
2545 /**
2546 * @param checkForDuplicates the checkForDuplicates to set
2547 */
2548 public void setCheckForDuplicates(boolean checkForDuplicates) {
2549 this.checkForDuplicates = checkForDuplicates;
2550 }
2551
2552 public boolean isTransactedIndividualAck() {
2553 return transactedIndividualAck;
2554 }
2555
2556 public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2557 this.transactedIndividualAck = transactedIndividualAck;
2558 }
2559
2560 public boolean isNonBlockingRedelivery() {
2561 return nonBlockingRedelivery;
2562 }
2563
2564 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2565 this.nonBlockingRedelivery = nonBlockingRedelivery;
2566 }
2567
2568 /**
2569 * Removes any TempDestinations that this connection has cached, ignoring
2570 * any exceptions generated because the destination is in use as they should
2571 * not be removed.
2572 * Used from a pooled connection, b/c it will not be explicitly closed.
2573 */
2574 public void cleanUpTempDestinations() {
2575
2576 if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2577 return;
2578 }
2579
2580 Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2581 = this.activeTempDestinations.entrySet().iterator();
2582 while(entries.hasNext()) {
2583 ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2584 try {
2585 // Only delete this temp destination if it was created from this connection. The connection used
2586 // for the advisory consumer may also have a reference to this temp destination.
2587 ActiveMQTempDestination dest = entry.getValue();
2588 String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2589 if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2590 this.deleteTempDestination(entry.getValue());
2591 }
2592 } catch (Exception ex) {
2593 // the temp dest is in use so it can not be deleted.
2594 // it is ok to leave it to connection tear down phase
2595 }
2596 }
2597 }
2598
2599 /**
2600 * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2601 * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2602 */
2603 public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2604 this.redeliveryPolicyMap = redeliveryPolicyMap;
2605 }
2606
2607 /**
2608 * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2609 * Consumers when dealing with transaction messages that have been rolled back.
2610 *
2611 * @return the redeliveryPolicyMap
2612 */
2613 public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2614 return redeliveryPolicyMap;
2615 }
2616
2617 public int getMaxThreadPoolSize() {
2618 return maxThreadPoolSize;
2619 }
2620
2621 public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2622 this.maxThreadPoolSize = maxThreadPoolSize;
2623 }
2624
2625 /**
2626 * Enable enforcement of QueueConnection semantics.
2627 *
2628 * @return this object, useful for chaining
2629 */
2630 ActiveMQConnection enforceQueueOnlyConnection() {
2631 this.queueOnlyConnection = true;
2632 return this;
2633 }
2634
2635 public RejectedExecutionHandler getRejectedTaskHandler() {
2636 return rejectedTaskHandler;
2637 }
2638
2639 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2640 this.rejectedTaskHandler = rejectedTaskHandler;
2641 }
2642
2643 /**
2644 * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2645 * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers
2646 * will not do any background Message acknowledgment.
2647 *
2648 * @return the scheduledOptimizedAckInterval
2649 */
2650 public long getOptimizedAckScheduledAckInterval() {
2651 return optimizedAckScheduledAckInterval;
2652 }
2653
2654 /**
2655 * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2656 * have been configured with optimizeAcknowledge enabled.
2657 *
2658 * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
2659 */
2660 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2661 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2662 }
2663 }