001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import org.apache.activemq.blob.BlobTransferPolicy;
020 import org.apache.activemq.blob.BlobUploader;
021 import org.apache.activemq.command.*;
022 import org.apache.activemq.management.JMSSessionStatsImpl;
023 import org.apache.activemq.management.StatsCapable;
024 import org.apache.activemq.management.StatsImpl;
025 import org.apache.activemq.thread.Scheduler;
026 import org.apache.activemq.transaction.Synchronization;
027 import org.apache.activemq.usage.MemoryUsage;
028 import org.apache.activemq.util.Callback;
029 import org.apache.activemq.util.LongSequenceGenerator;
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032
033 import javax.jms.*;
034 import javax.jms.IllegalStateException;
035 import javax.jms.Message;
036 import java.io.File;
037 import java.io.InputStream;
038 import java.io.Serializable;
039 import java.net.URL;
040 import java.util.Collections;
041 import java.util.Iterator;
042 import java.util.List;
043 import java.util.concurrent.CopyOnWriteArrayList;
044 import java.util.concurrent.atomic.AtomicBoolean;
045
046 /**
047 * <P>
048 * A <CODE>Session</CODE> object is a single-threaded context for producing
049 * and consuming messages. Although it may allocate provider resources outside
050 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
051 * <P>
052 * A session serves several purposes:
053 * <UL>
054 * <LI>It is a factory for its message producers and consumers.
055 * <LI>It supplies provider-optimized message factories.
056 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
057 * <CODE>TemporaryQueues</CODE>.
058 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
059 * objects for those clients that need to dynamically manipulate
060 * provider-specific destination names.
061 * <LI>It supports a single series of transactions that combine work spanning
062 * its producers and consumers into atomic units.
063 * <LI>It defines a serial order for the messages it consumes and the messages
064 * it produces.
065 * <LI>It retains messages it consumes until they have been acknowledged.
066 * <LI>It serializes execution of message listeners registered with its message
067 * consumers.
068 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
069 * </UL>
070 * <P>
071 * A session can create and service multiple message producers and consumers.
072 * <P>
073 * One typical use is to have a thread block on a synchronous
074 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
075 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
076 * <P>
077 * If a client desires to have one thread produce messages while others consume
078 * them, the client should use a separate session for its producing thread.
079 * <P>
080 * Once a connection has been started, any session with one or more registered
081 * message listeners is dedicated to the thread of control that delivers
082 * messages to it. It is erroneous for client code to use this session or any of
083 * its constituent objects from another thread of control. The only exception to
084 * this rule is the use of the session or connection <CODE>close</CODE>
085 * method.
086 * <P>
087 * It should be easy for most clients to partition their work naturally into
088 * sessions. This model allows clients to start simply and incrementally add
089 * message processing complexity as their need for concurrency grows.
090 * <P>
091 * The <CODE>close</CODE> method is the only session method that can be called
092 * while some other session method is being executed in another thread.
093 * <P>
094 * A session may be specified as transacted. Each transacted session supports a
095 * single series of transactions. Each transaction groups a set of message sends
096 * and a set of message receives into an atomic unit of work. In effect,
097 * transactions organize a session's input message stream and output message
098 * stream into series of atomic units. When a transaction commits, its atomic
099 * unit of input is acknowledged and its associated atomic unit of output is
100 * sent. If a transaction rollback is done, the transaction's sent messages are
101 * destroyed and the session's input is automatically recovered.
102 * <P>
103 * The content of a transaction's input and output units is simply those
104 * messages that have been produced and consumed within the session's current
105 * transaction.
106 * <P>
107 * A transaction is completed using either its session's <CODE>commit</CODE>
108 * method or its session's <CODE>rollback </CODE> method. The completion of a
109 * session's current transaction automatically begins the next. The result is
110 * that a transacted session always has a current transaction within which its
111 * work is done.
112 * <P>
113 * The Java Transaction Service (JTS) or some other transaction monitor may be
114 * used to combine a session's transaction with transactions on other resources
115 * (databases, other JMS sessions, etc.). Since Java distributed transactions
116 * are controlled via the Java Transaction API (JTA), use of the session's
117 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
118 * prohibited.
119 * <P>
120 * The JMS API does not require support for JTA; however, it does define how a
121 * provider supplies this support.
122 * <P>
123 * Although it is also possible for a JMS client to handle distributed
124 * transactions directly, it is unlikely that many JMS clients will do this.
125 * Support for JTA in the JMS API is targeted at systems vendors who will be
126 * integrating the JMS API into their application server products.
127 *
128 * @version $Revision: 1.34 $
129 * @see javax.jms.Session
130 * @see javax.jms.QueueSession
131 * @see javax.jms.TopicSession
132 * @see javax.jms.XASession
133 */
134 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
135
136 /**
137 * Only acknowledge an individual message - using message.acknowledge()
138 * as opposed to CLIENT_ACKNOWLEDGE which
139 * acknowledges all messages consumed by a session at when acknowledge()
140 * is called
141 */
142 public static final int INDIVIDUAL_ACKNOWLEDGE=4;
143
144 public static interface DeliveryListener {
145 void beforeDelivery(ActiveMQSession session, Message msg);
146
147 void afterDelivery(ActiveMQSession session, Message msg);
148 }
149
150 private static final Log LOG = LogFactory.getLog(ActiveMQSession.class);
151
152 protected int acknowledgementMode;
153 protected final ActiveMQConnection connection;
154 protected final SessionInfo info;
155 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
156 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
157 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
158 protected final ActiveMQSessionExecutor executor = new ActiveMQSessionExecutor(this);
159 protected final AtomicBoolean started = new AtomicBoolean(false);
160
161 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
162 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
163
164 protected boolean closed;
165 protected boolean asyncDispatch;
166 protected boolean sessionAsyncDispatch;
167 protected final boolean debug;
168 protected Object sendMutex = new Object();
169
170 private MessageListener messageListener;
171 private JMSSessionStatsImpl stats;
172 private TransactionContext transactionContext;
173 private DeliveryListener deliveryListener;
174 private MessageTransformer transformer;
175 private BlobTransferPolicy blobTransferPolicy;
176
177 /**
178 * Construct the Session
179 *
180 * @param connection
181 * @param sessionId
182 * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
183 * Session.SESSION_TRANSACTED
184 * @param asyncDispatch
185 * @param sessionAsyncDispatch
186 * @throws JMSException on internal error
187 */
188 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
189 this.debug = LOG.isDebugEnabled();
190 this.connection = connection;
191 this.acknowledgementMode = acknowledgeMode;
192 this.asyncDispatch = asyncDispatch;
193 this.sessionAsyncDispatch = sessionAsyncDispatch;
194 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
195 setTransactionContext(new TransactionContext(connection));
196 connection.addSession(this);
197 stats = new JMSSessionStatsImpl(producers, consumers);
198 this.connection.asyncSendPacket(info);
199 setTransformer(connection.getTransformer());
200 setBlobTransferPolicy(connection.getBlobTransferPolicy());
201
202 if (connection.isStarted()) {
203 start();
204 }
205
206 }
207
208 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
209 this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
210 }
211
212 /**
213 * Sets the transaction context of the session.
214 *
215 * @param transactionContext - provides the means to control a JMS
216 * transaction.
217 */
218 public void setTransactionContext(TransactionContext transactionContext) {
219 this.transactionContext = transactionContext;
220 }
221
222 /**
223 * Returns the transaction context of the session.
224 *
225 * @return transactionContext - session's transaction context.
226 */
227 public TransactionContext getTransactionContext() {
228 return transactionContext;
229 }
230
231 /*
232 * (non-Javadoc)
233 *
234 * @see org.apache.activemq.management.StatsCapable#getStats()
235 */
236 public StatsImpl getStats() {
237 return stats;
238 }
239
240 /**
241 * Returns the session's statistics.
242 *
243 * @return stats - session's statistics.
244 */
245 public JMSSessionStatsImpl getSessionStats() {
246 return stats;
247 }
248
249 /**
250 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
251 * object is used to send a message containing a stream of uninterpreted
252 * bytes.
253 *
254 * @return the an ActiveMQBytesMessage
255 * @throws JMSException if the JMS provider fails to create this message due
256 * to some internal error.
257 */
258 public BytesMessage createBytesMessage() throws JMSException {
259 ActiveMQBytesMessage message = new ActiveMQBytesMessage();
260 configureMessage(message);
261 return message;
262 }
263
264 /**
265 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
266 * object is used to send a self-defining set of name-value pairs, where
267 * names are <CODE>String</CODE> objects and values are primitive values
268 * in the Java programming language.
269 *
270 * @return an ActiveMQMapMessage
271 * @throws JMSException if the JMS provider fails to create this message due
272 * to some internal error.
273 */
274 public MapMessage createMapMessage() throws JMSException {
275 ActiveMQMapMessage message = new ActiveMQMapMessage();
276 configureMessage(message);
277 return message;
278 }
279
280 /**
281 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
282 * interface is the root interface of all JMS messages. A
283 * <CODE>Message</CODE> object holds all the standard message header
284 * information. It can be sent when a message containing only header
285 * information is sufficient.
286 *
287 * @return an ActiveMQMessage
288 * @throws JMSException if the JMS provider fails to create this message due
289 * to some internal error.
290 */
291 public Message createMessage() throws JMSException {
292 ActiveMQMessage message = new ActiveMQMessage();
293 configureMessage(message);
294 return message;
295 }
296
297 /**
298 * Creates an <CODE>ObjectMessage</CODE> object. An
299 * <CODE>ObjectMessage</CODE> object is used to send a message that
300 * contains a serializable Java object.
301 *
302 * @return an ActiveMQObjectMessage
303 * @throws JMSException if the JMS provider fails to create this message due
304 * to some internal error.
305 */
306 public ObjectMessage createObjectMessage() throws JMSException {
307 ActiveMQObjectMessage message = new ActiveMQObjectMessage();
308 configureMessage(message);
309 return message;
310 }
311
312 /**
313 * Creates an initialized <CODE>ObjectMessage</CODE> object. An
314 * <CODE>ObjectMessage</CODE> object is used to send a message that
315 * contains a serializable Java object.
316 *
317 * @param object the object to use to initialize this message
318 * @return an ActiveMQObjectMessage
319 * @throws JMSException if the JMS provider fails to create this message due
320 * to some internal error.
321 */
322 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
323 ActiveMQObjectMessage message = new ActiveMQObjectMessage();
324 configureMessage(message);
325 message.setObject(object);
326 return message;
327 }
328
329 /**
330 * Creates a <CODE>StreamMessage</CODE> object. A
331 * <CODE>StreamMessage</CODE> object is used to send a self-defining
332 * stream of primitive values in the Java programming language.
333 *
334 * @return an ActiveMQStreamMessage
335 * @throws JMSException if the JMS provider fails to create this message due
336 * to some internal error.
337 */
338 public StreamMessage createStreamMessage() throws JMSException {
339 ActiveMQStreamMessage message = new ActiveMQStreamMessage();
340 configureMessage(message);
341 return message;
342 }
343
344 /**
345 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
346 * object is used to send a message containing a <CODE>String</CODE>
347 * object.
348 *
349 * @return an ActiveMQTextMessage
350 * @throws JMSException if the JMS provider fails to create this message due
351 * to some internal error.
352 */
353 public TextMessage createTextMessage() throws JMSException {
354 ActiveMQTextMessage message = new ActiveMQTextMessage();
355 configureMessage(message);
356 return message;
357 }
358
359 /**
360 * Creates an initialized <CODE>TextMessage</CODE> object. A
361 * <CODE>TextMessage</CODE> object is used to send a message containing a
362 * <CODE>String</CODE>.
363 *
364 * @param text the string used to initialize this message
365 * @return an ActiveMQTextMessage
366 * @throws JMSException if the JMS provider fails to create this message due
367 * to some internal error.
368 */
369 public TextMessage createTextMessage(String text) throws JMSException {
370 ActiveMQTextMessage message = new ActiveMQTextMessage();
371 message.setText(text);
372 configureMessage(message);
373 return message;
374 }
375
376 /**
377 * Creates an initialized <CODE>BlobMessage</CODE> object. A
378 * <CODE>BlobMessage</CODE> object is used to send a message containing a
379 * <CODE>URL</CODE> which points to some network addressible BLOB.
380 *
381 * @param url the network addressable URL used to pass directly to the
382 * consumer
383 * @return a BlobMessage
384 * @throws JMSException if the JMS provider fails to create this message due
385 * to some internal error.
386 */
387 public BlobMessage createBlobMessage(URL url) throws JMSException {
388 return createBlobMessage(url, false);
389 }
390
391 /**
392 * Creates an initialized <CODE>BlobMessage</CODE> object. A
393 * <CODE>BlobMessage</CODE> object is used to send a message containing a
394 * <CODE>URL</CODE> which points to some network addressible BLOB.
395 *
396 * @param url the network addressable URL used to pass directly to the
397 * consumer
398 * @param deletedByBroker indicates whether or not the resource is deleted
399 * by the broker when the message is acknowledged
400 * @return a BlobMessage
401 * @throws JMSException if the JMS provider fails to create this message due
402 * to some internal error.
403 */
404 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
405 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
406 configureMessage(message);
407 message.setURL(url);
408 message.setDeletedByBroker(deletedByBroker);
409 return message;
410 }
411
412 /**
413 * Creates an initialized <CODE>BlobMessage</CODE> object. A
414 * <CODE>BlobMessage</CODE> object is used to send a message containing
415 * the <CODE>File</CODE> content. Before the message is sent the file
416 * conent will be uploaded to the broker or some other remote repository
417 * depending on the {@link #getBlobTransferPolicy()}.
418 *
419 * @param file the file to be uploaded to some remote repo (or the broker)
420 * depending on the strategy
421 * @return a BlobMessage
422 * @throws JMSException if the JMS provider fails to create this message due
423 * to some internal error.
424 */
425 public BlobMessage createBlobMessage(File file) throws JMSException {
426 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
427 configureMessage(message);
428 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
429 message.setDeletedByBroker(true);
430 message.setName(file.getName());
431 return message;
432 }
433
434 /**
435 * Creates an initialized <CODE>BlobMessage</CODE> object. A
436 * <CODE>BlobMessage</CODE> object is used to send a message containing
437 * the <CODE>File</CODE> content. Before the message is sent the file
438 * conent will be uploaded to the broker or some other remote repository
439 * depending on the {@link #getBlobTransferPolicy()}.
440 *
441 * @param in the stream to be uploaded to some remote repo (or the broker)
442 * depending on the strategy
443 * @return a BlobMessage
444 * @throws JMSException if the JMS provider fails to create this message due
445 * to some internal error.
446 */
447 public BlobMessage createBlobMessage(InputStream in) throws JMSException {
448 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
449 configureMessage(message);
450 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
451 message.setDeletedByBroker(true);
452 return message;
453 }
454
455 /**
456 * Indicates whether the session is in transacted mode.
457 *
458 * @return true if the session is in transacted mode
459 * @throws JMSException if there is some internal error.
460 */
461 public boolean getTransacted() throws JMSException {
462 checkClosed();
463 return (acknowledgementMode == Session.SESSION_TRANSACTED) || (transactionContext.isInXATransaction());
464 }
465
466 /**
467 * Returns the acknowledgement mode of the session. The acknowledgement mode
468 * is set at the time that the session is created. If the session is
469 * transacted, the acknowledgement mode is ignored.
470 *
471 * @return If the session is not transacted, returns the current
472 * acknowledgement mode for the session. If the session is
473 * transacted, returns SESSION_TRANSACTED.
474 * @throws JMSException
475 * @see javax.jms.Connection#createSession(boolean,int)
476 * @since 1.1 exception JMSException if there is some internal error.
477 */
478 public int getAcknowledgeMode() throws JMSException {
479 checkClosed();
480 return this.acknowledgementMode;
481 }
482
483 /**
484 * Commits all messages done in this transaction and releases any locks
485 * currently held.
486 *
487 * @throws JMSException if the JMS provider fails to commit the transaction
488 * due to some internal error.
489 * @throws TransactionRolledBackException if the transaction is rolled back
490 * due to some internal error during commit.
491 * @throws javax.jms.IllegalStateException if the method is not called by a
492 * transacted session.
493 */
494 public void commit() throws JMSException {
495 checkClosed();
496 if (!getTransacted()) {
497 throw new javax.jms.IllegalStateException("Not a transacted session");
498 }
499 if (LOG.isDebugEnabled()) {
500 LOG.debug(getSessionId() + " Transaction Commit");
501 }
502 transactionContext.commit();
503 }
504
505 /**
506 * Rolls back any messages done in this transaction and releases any locks
507 * currently held.
508 *
509 * @throws JMSException if the JMS provider fails to roll back the
510 * transaction due to some internal error.
511 * @throws javax.jms.IllegalStateException if the method is not called by a
512 * transacted session.
513 */
514 public void rollback() throws JMSException {
515 checkClosed();
516 if (!getTransacted()) {
517 throw new javax.jms.IllegalStateException("Not a transacted session");
518 }
519 if (LOG.isDebugEnabled()) {
520 LOG.debug(getSessionId() + " Transaction Rollback");
521 }
522 transactionContext.rollback();
523 }
524
525 /**
526 * Closes the session.
527 * <P>
528 * Since a provider may allocate some resources on behalf of a session
529 * outside the JVM, clients should close the resources when they are not
530 * needed. Relying on garbage collection to eventually reclaim these
531 * resources may not be timely enough.
532 * <P>
533 * There is no need to close the producers and consumers of a closed
534 * session.
535 * <P>
536 * This call will block until a <CODE>receive</CODE> call or message
537 * listener in progress has completed. A blocked message consumer
538 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
539 * is closed.
540 * <P>
541 * Closing a transacted session must roll back the transaction in progress.
542 * <P>
543 * This method is the only <CODE>Session</CODE> method that can be called
544 * concurrently.
545 * <P>
546 * Invoking any other <CODE>Session</CODE> method on a closed session must
547 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
548 * closed session must <I>not </I> throw an exception.
549 *
550 * @throws JMSException if the JMS provider fails to close the session due
551 * to some internal error.
552 */
553 public void close() throws JMSException {
554 if (!closed) {
555 dispose();
556 connection.asyncSendPacket(info.createRemoveCommand());
557 }
558 }
559
560 void clearMessagesInProgress() {
561 executor.clearMessagesInProgress();
562 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
563 ActiveMQMessageConsumer consumer = iter.next();
564 consumer.clearMessagesInProgress();
565 }
566 }
567
568 void deliverAcks() {
569 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
570 ActiveMQMessageConsumer consumer = iter.next();
571 consumer.deliverAcks();
572 }
573 }
574
575 public synchronized void dispose() throws JMSException {
576 if (!closed) {
577
578 try {
579 executor.stop();
580
581 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
582 ActiveMQMessageConsumer consumer = iter.next();
583 consumer.dispose();
584 }
585 consumers.clear();
586
587 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
588 ActiveMQMessageProducer producer = iter.next();
589 producer.dispose();
590 }
591 producers.clear();
592
593 try {
594 if (getTransactionContext().isInLocalTransaction()) {
595 rollback();
596 }
597 } catch (JMSException e) {
598 }
599
600 } finally {
601 connection.removeSession(this);
602 this.transactionContext = null;
603 closed = true;
604 }
605 }
606 }
607
608 /**
609 * Checks that the session is not closed then configures the message
610 */
611 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
612 checkClosed();
613 message.setConnection(connection);
614 }
615
616 /**
617 * Check if the session is closed. It is used for ensuring that the session
618 * is open before performing various operations.
619 *
620 * @throws IllegalStateException if the Session is closed
621 */
622 protected void checkClosed() throws IllegalStateException {
623 if (closed) {
624 throw new IllegalStateException("The Session is closed");
625 }
626 }
627
628 /**
629 * Stops message delivery in this session, and restarts message delivery
630 * with the oldest unacknowledged message.
631 * <P>
632 * All consumers deliver messages in a serial order. Acknowledging a
633 * received message automatically acknowledges all messages that have been
634 * delivered to the client.
635 * <P>
636 * Restarting a session causes it to take the following actions:
637 * <UL>
638 * <LI>Stop message delivery
639 * <LI>Mark all messages that might have been delivered but not
640 * acknowledged as "redelivered"
641 * <LI>Restart the delivery sequence including all unacknowledged messages
642 * that had been previously delivered. Redelivered messages do not have to
643 * be delivered in exactly their original delivery order.
644 * </UL>
645 *
646 * @throws JMSException if the JMS provider fails to stop and restart
647 * message delivery due to some internal error.
648 * @throws IllegalStateException if the method is called by a transacted
649 * session.
650 */
651 public void recover() throws JMSException {
652
653 checkClosed();
654 if (getTransacted()) {
655 throw new IllegalStateException("This session is transacted");
656 }
657
658 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
659 ActiveMQMessageConsumer c = iter.next();
660 c.rollback();
661 }
662
663 }
664
665 /**
666 * Returns the session's distinguished message listener (optional).
667 *
668 * @return the message listener associated with this session
669 * @throws JMSException if the JMS provider fails to get the message
670 * listener due to an internal error.
671 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
672 * @see javax.jms.ServerSessionPool
673 * @see javax.jms.ServerSession
674 */
675 public MessageListener getMessageListener() throws JMSException {
676 checkClosed();
677 return this.messageListener;
678 }
679
680 /**
681 * Sets the session's distinguished message listener (optional).
682 * <P>
683 * When the distinguished message listener is set, no other form of message
684 * receipt in the session can be used; however, all forms of sending
685 * messages are still supported.
686 * <P>
687 * This is an expert facility not used by regular JMS clients.
688 *
689 * @param listener the message listener to associate with this session
690 * @throws JMSException if the JMS provider fails to set the message
691 * listener due to an internal error.
692 * @see javax.jms.Session#getMessageListener()
693 * @see javax.jms.ServerSessionPool
694 * @see javax.jms.ServerSession
695 */
696 public void setMessageListener(MessageListener listener) throws JMSException {
697 checkClosed();
698 this.messageListener = listener;
699
700 if (listener != null) {
701 executor.setDispatchedBySessionPool(true);
702 }
703 }
704
705 /**
706 * Optional operation, intended to be used only by Application Servers, not
707 * by ordinary JMS clients.
708 *
709 * @see javax.jms.ServerSession
710 */
711 public void run() {
712 MessageDispatch messageDispatch;
713 while ((messageDispatch = executor.dequeueNoWait()) != null) {
714 final MessageDispatch md = messageDispatch;
715 ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
716 if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
717 // TODO: Ack it without delivery to client
718 continue;
719 }
720
721 if (isClientAcknowledge()||isIndividualAcknowledge()) {
722 message.setAcknowledgeCallback(new Callback() {
723 public void execute() throws Exception {
724 }
725 });
726 }
727
728 if (deliveryListener != null) {
729 deliveryListener.beforeDelivery(this, message);
730 }
731
732 md.setDeliverySequenceId(getNextDeliveryId());
733
734 try {
735 messageListener.onMessage(message);
736 } catch (RuntimeException e) {
737 LOG.error("error dispatching message: ", e);
738 // A problem while invoking the MessageListener does not
739 // in general indicate a problem with the connection to the broker, i.e.
740 // it will usually be sufficient to let the afterDelivery() method either
741 // commit or roll back in order to deal with the exception.
742 // However, we notify any registered client internal exception listener
743 // of the problem.
744 connection.onClientInternalException(e);
745 }
746
747 try {
748 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
749 ack.setFirstMessageId(md.getMessage().getMessageId());
750 doStartTransaction();
751 ack.setTransactionId(getTransactionContext().getTransactionId());
752 if (ack.getTransactionId() != null) {
753 getTransactionContext().addSynchronization(new Synchronization() {
754
755 public void afterRollback() throws Exception {
756 md.getMessage().onMessageRolledBack();
757 // ensure we don't filter this as a duplicate
758 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
759 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
760 int redeliveryCounter = md.getMessage().getRedeliveryCounter();
761 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
762 && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
763 // We need to NACK the messages so that they get
764 // sent to the
765 // DLQ.
766 // Acknowledge the last message.
767 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
768 ack.setFirstMessageId(md.getMessage().getMessageId());
769 asyncSendPacket(ack);
770 } else {
771
772 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
773 ack.setFirstMessageId(md.getMessage().getMessageId());
774 asyncSendPacket(ack);
775
776 // Figure out how long we should wait to resend
777 // this message.
778 long redeliveryDelay = 0;
779 for (int i = 0; i < redeliveryCounter; i++) {
780 redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
781 }
782 Scheduler.executeAfterDelay(new Runnable() {
783
784 public void run() {
785 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
786 }
787 }, redeliveryDelay);
788 }
789 }
790 });
791 }
792 asyncSendPacket(ack);
793 } catch (Throwable e) {
794 connection.onClientInternalException(e);
795 }
796
797 if (deliveryListener != null) {
798 deliveryListener.afterDelivery(this, message);
799 }
800 }
801 }
802
803 /**
804 * Creates a <CODE>MessageProducer</CODE> to send messages to the
805 * specified destination.
806 * <P>
807 * A client uses a <CODE>MessageProducer</CODE> object to send messages to
808 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
809 * inherit from <CODE>Destination</CODE>, they can be used in the
810 * destination parameter to create a <CODE>MessageProducer</CODE> object.
811 *
812 * @param destination the <CODE>Destination</CODE> to send to, or null if
813 * this is a producer which does not have a specified
814 * destination.
815 * @return the MessageProducer
816 * @throws JMSException if the session fails to create a MessageProducer due
817 * to some internal error.
818 * @throws InvalidDestinationException if an invalid destination is
819 * specified.
820 * @since 1.1
821 */
822 public MessageProducer createProducer(Destination destination) throws JMSException {
823 checkClosed();
824 if (destination instanceof CustomDestination) {
825 CustomDestination customDestination = (CustomDestination)destination;
826 return customDestination.createProducer(this);
827 }
828 int timeSendOut = connection.getSendTimeout();
829 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
830 }
831
832 /**
833 * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
834 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
835 * <CODE>Destination</CODE>, they can be used in the destination
836 * parameter to create a <CODE>MessageConsumer</CODE>.
837 *
838 * @param destination the <CODE>Destination</CODE> to access.
839 * @return the MessageConsumer
840 * @throws JMSException if the session fails to create a consumer due to
841 * some internal error.
842 * @throws InvalidDestinationException if an invalid destination is
843 * specified.
844 * @since 1.1
845 */
846 public MessageConsumer createConsumer(Destination destination) throws JMSException {
847 return createConsumer(destination, (String) null);
848 }
849
850 /**
851 * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
852 * using a message selector. Since <CODE> Queue</CODE> and
853 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
854 * can be used in the destination parameter to create a
855 * <CODE>MessageConsumer</CODE>.
856 * <P>
857 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
858 * that have been sent to a destination.
859 *
860 * @param destination the <CODE>Destination</CODE> to access
861 * @param messageSelector only messages with properties matching the message
862 * selector expression are delivered. A value of null or an
863 * empty string indicates that there is no message selector
864 * for the message consumer.
865 * @return the MessageConsumer
866 * @throws JMSException if the session fails to create a MessageConsumer due
867 * to some internal error.
868 * @throws InvalidDestinationException if an invalid destination is
869 * specified.
870 * @throws InvalidSelectorException if the message selector is invalid.
871 * @since 1.1
872 */
873 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
874 return createConsumer(destination, messageSelector, false);
875 }
876
877 /**
878 * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
879 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
880 * <CODE>Destination</CODE>, they can be used in the destination
881 * parameter to create a <CODE>MessageConsumer</CODE>.
882 *
883 * @param destination the <CODE>Destination</CODE> to access.
884 * @param messageListener the listener to use for async consumption of messages
885 * @return the MessageConsumer
886 * @throws JMSException if the session fails to create a consumer due to
887 * some internal error.
888 * @throws InvalidDestinationException if an invalid destination is
889 * specified.
890 * @since 1.1
891 */
892 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
893 return createConsumer(destination, null, messageListener);
894 }
895
896 /**
897 * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
898 * using a message selector. Since <CODE> Queue</CODE> and
899 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
900 * can be used in the destination parameter to create a
901 * <CODE>MessageConsumer</CODE>.
902 * <P>
903 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
904 * that have been sent to a destination.
905 *
906 * @param destination the <CODE>Destination</CODE> to access
907 * @param messageSelector only messages with properties matching the message
908 * selector expression are delivered. A value of null or an
909 * empty string indicates that there is no message selector
910 * for the message consumer.
911 * @param messageListener the listener to use for async consumption of messages
912 * @return the MessageConsumer
913 * @throws JMSException if the session fails to create a MessageConsumer due
914 * to some internal error.
915 * @throws InvalidDestinationException if an invalid destination is
916 * specified.
917 * @throws InvalidSelectorException if the message selector is invalid.
918 * @since 1.1
919 */
920 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
921 return createConsumer(destination, messageSelector, false, messageListener);
922 }
923
924 /**
925 * Creates <CODE>MessageConsumer</CODE> for the specified destination,
926 * using a message selector. This method can specify whether messages
927 * published by its own connection should be delivered to it, if the
928 * destination is a topic.
929 * <P>
930 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
931 * <CODE>Destination</CODE>, they can be used in the destination
932 * parameter to create a <CODE>MessageConsumer</CODE>.
933 * <P>
934 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
935 * that have been published to a destination.
936 * <P>
937 * In some cases, a connection may both publish and subscribe to a topic.
938 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
939 * inhibit the delivery of messages published by its own connection. The
940 * default value for this attribute is False. The <CODE>noLocal</CODE>
941 * value must be supported by destinations that are topics.
942 *
943 * @param destination the <CODE>Destination</CODE> to access
944 * @param messageSelector only messages with properties matching the message
945 * selector expression are delivered. A value of null or an
946 * empty string indicates that there is no message selector
947 * for the message consumer.
948 * @param noLocal - if true, and the destination is a topic, inhibits the
949 * delivery of messages published by its own connection. The
950 * behavior for <CODE>NoLocal</CODE> is not specified if
951 * the destination is a queue.
952 * @return the MessageConsumer
953 * @throws JMSException if the session fails to create a MessageConsumer due
954 * to some internal error.
955 * @throws InvalidDestinationException if an invalid destination is
956 * specified.
957 * @throws InvalidSelectorException if the message selector is invalid.
958 * @since 1.1
959 */
960 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
961 return createConsumer(destination, messageSelector, noLocal, null);
962 }
963
964 /**
965 * Creates <CODE>MessageConsumer</CODE> for the specified destination,
966 * using a message selector. This method can specify whether messages
967 * published by its own connection should be delivered to it, if the
968 * destination is a topic.
969 * <P>
970 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
971 * <CODE>Destination</CODE>, they can be used in the destination
972 * parameter to create a <CODE>MessageConsumer</CODE>.
973 * <P>
974 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
975 * that have been published to a destination.
976 * <P>
977 * In some cases, a connection may both publish and subscribe to a topic.
978 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
979 * inhibit the delivery of messages published by its own connection. The
980 * default value for this attribute is False. The <CODE>noLocal</CODE>
981 * value must be supported by destinations that are topics.
982 *
983 * @param destination the <CODE>Destination</CODE> to access
984 * @param messageSelector only messages with properties matching the message
985 * selector expression are delivered. A value of null or an
986 * empty string indicates that there is no message selector
987 * for the message consumer.
988 * @param noLocal - if true, and the destination is a topic, inhibits the
989 * delivery of messages published by its own connection. The
990 * behavior for <CODE>NoLocal</CODE> is not specified if
991 * the destination is a queue.
992 * @param messageListener the listener to use for async consumption of messages
993 * @return the MessageConsumer
994 * @throws JMSException if the session fails to create a MessageConsumer due
995 * to some internal error.
996 * @throws InvalidDestinationException if an invalid destination is
997 * specified.
998 * @throws InvalidSelectorException if the message selector is invalid.
999 * @since 1.1
1000 */
1001 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1002 checkClosed();
1003
1004 if (destination instanceof CustomDestination) {
1005 CustomDestination customDestination = (CustomDestination)destination;
1006 return customDestination.createConsumer(this, messageSelector, noLocal);
1007 }
1008
1009 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1010 int prefetch = 0;
1011 if (destination instanceof Topic) {
1012 prefetch = prefetchPolicy.getTopicPrefetch();
1013 } else {
1014 prefetch = prefetchPolicy.getQueuePrefetch();
1015 }
1016 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1017 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1018 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1019 }
1020
1021 /**
1022 * Creates a queue identity given a <CODE>Queue</CODE> name.
1023 * <P>
1024 * This facility is provided for the rare cases where clients need to
1025 * dynamically manipulate queue identity. It allows the creation of a queue
1026 * identity with a provider-specific name. Clients that depend on this
1027 * ability are not portable.
1028 * <P>
1029 * Note that this method is not for creating the physical queue. The
1030 * physical creation of queues is an administrative task and is not to be
1031 * initiated by the JMS API. The one exception is the creation of temporary
1032 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1033 * method.
1034 *
1035 * @param queueName the name of this <CODE>Queue</CODE>
1036 * @return a <CODE>Queue</CODE> with the given name
1037 * @throws JMSException if the session fails to create a queue due to some
1038 * internal error.
1039 * @since 1.1
1040 */
1041 public Queue createQueue(String queueName) throws JMSException {
1042 checkClosed();
1043 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1044 return new ActiveMQTempQueue(queueName);
1045 }
1046 return new ActiveMQQueue(queueName);
1047 }
1048
1049 /**
1050 * Creates a topic identity given a <CODE>Topic</CODE> name.
1051 * <P>
1052 * This facility is provided for the rare cases where clients need to
1053 * dynamically manipulate topic identity. This allows the creation of a
1054 * topic identity with a provider-specific name. Clients that depend on this
1055 * ability are not portable.
1056 * <P>
1057 * Note that this method is not for creating the physical topic. The
1058 * physical creation of topics is an administrative task and is not to be
1059 * initiated by the JMS API. The one exception is the creation of temporary
1060 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1061 * method.
1062 *
1063 * @param topicName the name of this <CODE>Topic</CODE>
1064 * @return a <CODE>Topic</CODE> with the given name
1065 * @throws JMSException if the session fails to create a topic due to some
1066 * internal error.
1067 * @since 1.1
1068 */
1069 public Topic createTopic(String topicName) throws JMSException {
1070 checkClosed();
1071 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1072 return new ActiveMQTempTopic(topicName);
1073 }
1074 return new ActiveMQTopic(topicName);
1075 }
1076
1077 /**
1078 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1079 * the specified queue.
1080 *
1081 * @param queue the <CODE>queue</CODE> to access
1082 * @exception InvalidDestinationException if an invalid destination is
1083 * specified
1084 * @since 1.1
1085 */
1086 /**
1087 * Creates a durable subscriber to the specified topic.
1088 * <P>
1089 * If a client needs to receive all the messages published on a topic,
1090 * including the ones published while the subscriber is inactive, it uses a
1091 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1092 * record of this durable subscription and insures that all messages from
1093 * the topic's publishers are retained until they are acknowledged by this
1094 * durable subscriber or they have expired.
1095 * <P>
1096 * Sessions with durable subscribers must always provide the same client
1097 * identifier. In addition, each client must specify a name that uniquely
1098 * identifies (within client identifier) each durable subscription it
1099 * creates. Only one session at a time can have a
1100 * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1101 * <P>
1102 * A client can change an existing durable subscription by creating a
1103 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1104 * and/or message selector. Changing a durable subscriber is equivalent to
1105 * unsubscribing (deleting) the old one and creating a new one.
1106 * <P>
1107 * In some cases, a connection may both publish and subscribe to a topic.
1108 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1109 * inhibit the delivery of messages published by its own connection. The
1110 * default value for this attribute is false.
1111 *
1112 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1113 * @param name the name used to identify this subscription
1114 * @return the TopicSubscriber
1115 * @throws JMSException if the session fails to create a subscriber due to
1116 * some internal error.
1117 * @throws InvalidDestinationException if an invalid topic is specified.
1118 * @since 1.1
1119 */
1120 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1121 checkClosed();
1122 return createDurableSubscriber(topic, name, null, false);
1123 }
1124
1125 /**
1126 * Creates a durable subscriber to the specified topic, using a message
1127 * selector and specifying whether messages published by its own connection
1128 * should be delivered to it.
1129 * <P>
1130 * If a client needs to receive all the messages published on a topic,
1131 * including the ones published while the subscriber is inactive, it uses a
1132 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1133 * record of this durable subscription and insures that all messages from
1134 * the topic's publishers are retained until they are acknowledged by this
1135 * durable subscriber or they have expired.
1136 * <P>
1137 * Sessions with durable subscribers must always provide the same client
1138 * identifier. In addition, each client must specify a name which uniquely
1139 * identifies (within client identifier) each durable subscription it
1140 * creates. Only one session at a time can have a
1141 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1142 * inactive durable subscriber is one that exists but does not currently
1143 * have a message consumer associated with it.
1144 * <P>
1145 * A client can change an existing durable subscription by creating a
1146 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1147 * and/or message selector. Changing a durable subscriber is equivalent to
1148 * unsubscribing (deleting) the old one and creating a new one.
1149 *
1150 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1151 * @param name the name used to identify this subscription
1152 * @param messageSelector only messages with properties matching the message
1153 * selector expression are delivered. A value of null or an
1154 * empty string indicates that there is no message selector
1155 * for the message consumer.
1156 * @param noLocal if set, inhibits the delivery of messages published by its
1157 * own connection
1158 * @return the Queue Browser
1159 * @throws JMSException if the session fails to create a subscriber due to
1160 * some internal error.
1161 * @throws InvalidDestinationException if an invalid topic is specified.
1162 * @throws InvalidSelectorException if the message selector is invalid.
1163 * @since 1.1
1164 */
1165 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1166 checkClosed();
1167
1168 if (topic instanceof CustomDestination) {
1169 CustomDestination customDestination = (CustomDestination)topic;
1170 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1171 }
1172
1173 connection.checkClientIDWasManuallySpecified();
1174 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1175 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1176 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1177 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1178 noLocal, false, asyncDispatch);
1179 }
1180
1181 /**
1182 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1183 * the specified queue.
1184 *
1185 * @param queue the <CODE>queue</CODE> to access
1186 * @return the Queue Browser
1187 * @throws JMSException if the session fails to create a browser due to some
1188 * internal error.
1189 * @throws InvalidDestinationException if an invalid destination is
1190 * specified
1191 * @since 1.1
1192 */
1193 public QueueBrowser createBrowser(Queue queue) throws JMSException {
1194 checkClosed();
1195 return createBrowser(queue, null);
1196 }
1197
1198 /**
1199 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1200 * the specified queue using a message selector.
1201 *
1202 * @param queue the <CODE>queue</CODE> to access
1203 * @param messageSelector only messages with properties matching the message
1204 * selector expression are delivered. A value of null or an
1205 * empty string indicates that there is no message selector
1206 * for the message consumer.
1207 * @return the Queue Browser
1208 * @throws JMSException if the session fails to create a browser due to some
1209 * internal error.
1210 * @throws InvalidDestinationException if an invalid destination is
1211 * specified
1212 * @throws InvalidSelectorException if the message selector is invalid.
1213 * @since 1.1
1214 */
1215 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1216 checkClosed();
1217 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1218 }
1219
1220 /**
1221 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1222 * of the <CODE>Connection</CODE> unless it is deleted earlier.
1223 *
1224 * @return a temporary queue identity
1225 * @throws JMSException if the session fails to create a temporary queue due
1226 * to some internal error.
1227 * @since 1.1
1228 */
1229 public TemporaryQueue createTemporaryQueue() throws JMSException {
1230 checkClosed();
1231 return (TemporaryQueue)connection.createTempDestination(false);
1232 }
1233
1234 /**
1235 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1236 * of the <CODE>Connection</CODE> unless it is deleted earlier.
1237 *
1238 * @return a temporary topic identity
1239 * @throws JMSException if the session fails to create a temporary topic due
1240 * to some internal error.
1241 * @since 1.1
1242 */
1243 public TemporaryTopic createTemporaryTopic() throws JMSException {
1244 checkClosed();
1245 return (TemporaryTopic)connection.createTempDestination(true);
1246 }
1247
1248 /**
1249 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1250 * the specified queue.
1251 *
1252 * @param queue the <CODE>Queue</CODE> to access
1253 * @return
1254 * @throws JMSException if the session fails to create a receiver due to
1255 * some internal error.
1256 * @throws JMSException
1257 * @throws InvalidDestinationException if an invalid queue is specified.
1258 */
1259 public QueueReceiver createReceiver(Queue queue) throws JMSException {
1260 checkClosed();
1261 return createReceiver(queue, null);
1262 }
1263
1264 /**
1265 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1266 * the specified queue using a message selector.
1267 *
1268 * @param queue the <CODE>Queue</CODE> to access
1269 * @param messageSelector only messages with properties matching the message
1270 * selector expression are delivered. A value of null or an
1271 * empty string indicates that there is no message selector
1272 * for the message consumer.
1273 * @return QueueReceiver
1274 * @throws JMSException if the session fails to create a receiver due to
1275 * some internal error.
1276 * @throws InvalidDestinationException if an invalid queue is specified.
1277 * @throws InvalidSelectorException if the message selector is invalid.
1278 */
1279 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1280 checkClosed();
1281
1282 if (queue instanceof CustomDestination) {
1283 CustomDestination customDestination = (CustomDestination)queue;
1284 return customDestination.createReceiver(this, messageSelector);
1285 }
1286
1287 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1288 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1289 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1290 }
1291
1292 /**
1293 * Creates a <CODE>QueueSender</CODE> object to send messages to the
1294 * specified queue.
1295 *
1296 * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1297 * unidentified producer
1298 * @return QueueSender
1299 * @throws JMSException if the session fails to create a sender due to some
1300 * internal error.
1301 * @throws InvalidDestinationException if an invalid queue is specified.
1302 */
1303 public QueueSender createSender(Queue queue) throws JMSException {
1304 checkClosed();
1305 if (queue instanceof CustomDestination) {
1306 CustomDestination customDestination = (CustomDestination)queue;
1307 return customDestination.createSender(this);
1308 }
1309 int timeSendOut = connection.getSendTimeout();
1310 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1311 }
1312
1313 /**
1314 * Creates a nondurable subscriber to the specified topic. <p/>
1315 * <P>
1316 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1317 * that have been published to a topic. <p/>
1318 * <P>
1319 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1320 * receive only messages that are published while they are active. <p/>
1321 * <P>
1322 * In some cases, a connection may both publish and subscribe to a topic.
1323 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1324 * inhibit the delivery of messages published by its own connection. The
1325 * default value for this attribute is false.
1326 *
1327 * @param topic the <CODE>Topic</CODE> to subscribe to
1328 * @return TopicSubscriber
1329 * @throws JMSException if the session fails to create a subscriber due to
1330 * some internal error.
1331 * @throws InvalidDestinationException if an invalid topic is specified.
1332 */
1333 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1334 checkClosed();
1335 return createSubscriber(topic, null, false);
1336 }
1337
1338 /**
1339 * Creates a nondurable subscriber to the specified topic, using a message
1340 * selector or specifying whether messages published by its own connection
1341 * should be delivered to it. <p/>
1342 * <P>
1343 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1344 * that have been published to a topic. <p/>
1345 * <P>
1346 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1347 * receive only messages that are published while they are active. <p/>
1348 * <P>
1349 * Messages filtered out by a subscriber's message selector will never be
1350 * delivered to the subscriber. From the subscriber's perspective, they do
1351 * not exist. <p/>
1352 * <P>
1353 * In some cases, a connection may both publish and subscribe to a topic.
1354 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1355 * inhibit the delivery of messages published by its own connection. The
1356 * default value for this attribute is false.
1357 *
1358 * @param topic the <CODE>Topic</CODE> to subscribe to
1359 * @param messageSelector only messages with properties matching the message
1360 * selector expression are delivered. A value of null or an
1361 * empty string indicates that there is no message selector
1362 * for the message consumer.
1363 * @param noLocal if set, inhibits the delivery of messages published by its
1364 * own connection
1365 * @return TopicSubscriber
1366 * @throws JMSException if the session fails to create a subscriber due to
1367 * some internal error.
1368 * @throws InvalidDestinationException if an invalid topic is specified.
1369 * @throws InvalidSelectorException if the message selector is invalid.
1370 */
1371 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1372 checkClosed();
1373
1374 if (topic instanceof CustomDestination) {
1375 CustomDestination customDestination = (CustomDestination)topic;
1376 return customDestination.createSubscriber(this, messageSelector, noLocal);
1377 }
1378
1379 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1380 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1381 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1382 }
1383
1384 /**
1385 * Creates a publisher for the specified topic. <p/>
1386 * <P>
1387 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1388 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1389 * a topic, it defines a new sequence of messages that have no ordering
1390 * relationship with the messages it has previously sent.
1391 *
1392 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1393 * an unidentified producer
1394 * @return TopicPublisher
1395 * @throws JMSException if the session fails to create a publisher due to
1396 * some internal error.
1397 * @throws InvalidDestinationException if an invalid topic is specified.
1398 */
1399 public TopicPublisher createPublisher(Topic topic) throws JMSException {
1400 checkClosed();
1401
1402 if (topic instanceof CustomDestination) {
1403 CustomDestination customDestination = (CustomDestination)topic;
1404 return customDestination.createPublisher(this);
1405 }
1406 int timeSendOut = connection.getSendTimeout();
1407 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1408 }
1409
1410 /**
1411 * Unsubscribes a durable subscription that has been created by a client.
1412 * <P>
1413 * This method deletes the state being maintained on behalf of the
1414 * subscriber by its provider.
1415 * <P>
1416 * It is erroneous for a client to delete a durable subscription while there
1417 * is an active <CODE>MessageConsumer </CODE> or
1418 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1419 * message is part of a pending transaction or has not been acknowledged in
1420 * the session.
1421 *
1422 * @param name the name used to identify this subscription
1423 * @throws JMSException if the session fails to unsubscribe to the durable
1424 * subscription due to some internal error.
1425 * @throws InvalidDestinationException if an invalid subscription name is
1426 * specified.
1427 * @since 1.1
1428 */
1429 public void unsubscribe(String name) throws JMSException {
1430 checkClosed();
1431 connection.unsubscribe(name);
1432 }
1433
1434 public void dispatch(MessageDispatch messageDispatch) {
1435 try {
1436 executor.execute(messageDispatch);
1437 } catch (InterruptedException e) {
1438 Thread.currentThread().interrupt();
1439 connection.onClientInternalException(e);
1440 }
1441 }
1442
1443 /**
1444 * Acknowledges all consumed messages of the session of this consumed
1445 * message.
1446 * <P>
1447 * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1448 * for use when a client has specified that its JMS session's consumed
1449 * messages are to be explicitly acknowledged. By invoking
1450 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1451 * all messages consumed by the session that the message was delivered to.
1452 * <P>
1453 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1454 * sessions and sessions specified to use implicit acknowledgement modes.
1455 * <P>
1456 * A client may individually acknowledge each message as it is consumed, or
1457 * it may choose to acknowledge messages as an application-defined group
1458 * (which is done by calling acknowledge on the last received message of the
1459 * group, thereby acknowledging all messages consumed by the session.)
1460 * <P>
1461 * Messages that have been received but not acknowledged may be redelivered.
1462 *
1463 * @throws JMSException if the JMS provider fails to acknowledge the
1464 * messages due to some internal error.
1465 * @throws javax.jms.IllegalStateException if this method is called on a
1466 * closed session.
1467 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1468 */
1469 public void acknowledge() throws JMSException {
1470 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1471 ActiveMQMessageConsumer c = iter.next();
1472 c.acknowledge();
1473 }
1474 }
1475
1476 /**
1477 * Add a message consumer.
1478 *
1479 * @param consumer - message consumer.
1480 * @throws JMSException
1481 */
1482 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1483 this.consumers.add(consumer);
1484 if (consumer.isDurableSubscriber()) {
1485 stats.onCreateDurableSubscriber();
1486 }
1487 this.connection.addDispatcher(consumer.getConsumerId(), this);
1488 }
1489
1490 /**
1491 * Remove the message consumer.
1492 *
1493 * @param consumer - consumer to be removed.
1494 * @throws JMSException
1495 */
1496 protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1497 this.connection.removeDispatcher(consumer.getConsumerId());
1498 if (consumer.isDurableSubscriber()) {
1499 stats.onRemoveDurableSubscriber();
1500 }
1501 this.consumers.remove(consumer);
1502 this.connection.removeDispatcher(consumer);
1503 }
1504
1505 /**
1506 * Adds a message producer.
1507 *
1508 * @param producer - message producer to be added.
1509 * @throws JMSException
1510 */
1511 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1512 this.producers.add(producer);
1513 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1514 }
1515
1516 /**
1517 * Removes a message producer.
1518 *
1519 * @param producer - message producer to be removed.
1520 * @throws JMSException
1521 */
1522 protected void removeProducer(ActiveMQMessageProducer producer) {
1523 this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1524 this.producers.remove(producer);
1525 }
1526
1527 /**
1528 * Start this Session.
1529 *
1530 * @throws JMSException
1531 */
1532 protected void start() throws JMSException {
1533 started.set(true);
1534 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1535 ActiveMQMessageConsumer c = iter.next();
1536 c.start();
1537 }
1538 executor.start();
1539 }
1540
1541 /**
1542 * Stops this session.
1543 *
1544 * @throws JMSException
1545 */
1546 protected void stop() throws JMSException {
1547
1548 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1549 ActiveMQMessageConsumer c = iter.next();
1550 c.stop();
1551 }
1552
1553 started.set(false);
1554 executor.stop();
1555 }
1556
1557 /**
1558 * Returns the session id.
1559 *
1560 * @return value - session id.
1561 */
1562 protected SessionId getSessionId() {
1563 return info.getSessionId();
1564 }
1565
1566 /**
1567 * @return
1568 */
1569 protected ConsumerId getNextConsumerId() {
1570 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1571 }
1572
1573 /**
1574 * @return
1575 */
1576 protected ProducerId getNextProducerId() {
1577 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1578 }
1579
1580 /**
1581 * Sends the message for dispatch by the broker.
1582 *
1583 * @param producer - message producer.
1584 * @param destination - message destination.
1585 * @param message - message to be sent.
1586 * @param deliveryMode - JMS messsage delivery mode.
1587 * @param priority - message priority.
1588 * @param timeToLive - message expiration.
1589 * @param producerWindow
1590 * @throws JMSException
1591 */
1592 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1593 MemoryUsage producerWindow, int sendTimeout) throws JMSException {
1594
1595 checkClosed();
1596 if (destination.isTemporary() && connection.isDeleted(destination)) {
1597 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1598 }
1599 synchronized (sendMutex) {
1600 // tell the Broker we are about to start a new transaction
1601 doStartTransaction();
1602 TransactionId txid = transactionContext.getTransactionId();
1603 long sequenceNumber = producer.getMessageSequence();
1604
1605 // transform to our own message format here
1606 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1607
1608 // Set the message id.
1609 if (msg == message) {
1610 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1611 } else {
1612 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1613 message.setJMSMessageID(msg.getMessageId().toString());
1614 }
1615 //clear the brokerPath in case we are re-sending this message
1616 msg.setBrokerPath(null);
1617
1618 msg.setJMSDestination(destination);
1619 msg.setJMSDeliveryMode(deliveryMode);
1620 long expiration = 0L;
1621 if (!producer.getDisableMessageTimestamp()) {
1622 long timeStamp = System.currentTimeMillis();
1623 msg.setJMSTimestamp(timeStamp);
1624 if (timeToLive > 0) {
1625 expiration = timeToLive + timeStamp;
1626 }
1627 }
1628 msg.setJMSExpiration(expiration);
1629 msg.setJMSPriority(priority);
1630 msg.setJMSRedelivered(false);
1631
1632 msg.setTransactionId(txid);
1633 if (connection.isCopyMessageOnSend()) {
1634 msg = (ActiveMQMessage)msg.copy();
1635 }
1636 msg.setConnection(connection);
1637 msg.onSend();
1638 msg.setProducerId(msg.getMessageId().getProducerId());
1639 if (this.debug) {
1640 LOG.debug(getSessionId() + " sending message: " + msg);
1641 }
1642 if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1643 this.connection.asyncSendPacket(msg);
1644 if (producerWindow != null) {
1645 // Since we defer lots of the marshaling till we hit the
1646 // wire, this might not
1647 // provide and accurate size. We may change over to doing
1648 // more aggressive marshaling,
1649 // to get more accurate sizes.. this is more important once
1650 // users start using producer window
1651 // flow control.
1652 int size = msg.getSize();
1653 producerWindow.increaseUsage(size);
1654 }
1655 } else {
1656 if (sendTimeout > 0) {
1657 this.connection.syncSendPacket(msg,sendTimeout);
1658 }else {
1659 this.connection.syncSendPacket(msg);
1660 }
1661 }
1662
1663 }
1664 }
1665
1666 /**
1667 * Send TransactionInfo to indicate transaction has started
1668 *
1669 * @throws JMSException if some internal error occurs
1670 */
1671 protected void doStartTransaction() throws JMSException {
1672 if (getTransacted() && !transactionContext.isInXATransaction()) {
1673 transactionContext.begin();
1674 }
1675 }
1676
1677 /**
1678 * Checks whether the session has unconsumed messages.
1679 *
1680 * @return true - if there are unconsumed messages.
1681 */
1682 public boolean hasUncomsumedMessages() {
1683 return executor.hasUncomsumedMessages();
1684 }
1685
1686 /**
1687 * Checks whether the session uses transactions.
1688 *
1689 * @return true - if the session uses transactions.
1690 */
1691 public boolean isTransacted() {
1692 return this.acknowledgementMode == Session.SESSION_TRANSACTED;
1693 }
1694
1695 /**
1696 * Checks whether the session used client acknowledgment.
1697 *
1698 * @return true - if the session uses client acknowledgment.
1699 */
1700 protected boolean isClientAcknowledge() {
1701 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1702 }
1703
1704 /**
1705 * Checks whether the session used auto acknowledgment.
1706 *
1707 * @return true - if the session uses client acknowledgment.
1708 */
1709 public boolean isAutoAcknowledge() {
1710 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1711 }
1712
1713 /**
1714 * Checks whether the session used dup ok acknowledgment.
1715 *
1716 * @return true - if the session uses client acknowledgment.
1717 */
1718 public boolean isDupsOkAcknowledge() {
1719 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1720 }
1721
1722 public boolean isIndividualAcknowledge(){
1723 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1724 }
1725
1726 /**
1727 * Returns the message delivery listener.
1728 *
1729 * @return deliveryListener - message delivery listener.
1730 */
1731 public DeliveryListener getDeliveryListener() {
1732 return deliveryListener;
1733 }
1734
1735 /**
1736 * Sets the message delivery listener.
1737 *
1738 * @param deliveryListener - message delivery listener.
1739 */
1740 public void setDeliveryListener(DeliveryListener deliveryListener) {
1741 this.deliveryListener = deliveryListener;
1742 }
1743
1744 /**
1745 * Returns the SessionInfo bean.
1746 *
1747 * @return info - SessionInfo bean.
1748 * @throws JMSException
1749 */
1750 protected SessionInfo getSessionInfo() throws JMSException {
1751 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1752 return info;
1753 }
1754
1755 /**
1756 * Send the asynchronus command.
1757 *
1758 * @param command - command to be executed.
1759 * @throws JMSException
1760 */
1761 public void asyncSendPacket(Command command) throws JMSException {
1762 connection.asyncSendPacket(command);
1763 }
1764
1765 /**
1766 * Send the synchronus command.
1767 *
1768 * @param command - command to be executed.
1769 * @return Response
1770 * @throws JMSException
1771 */
1772 public Response syncSendPacket(Command command) throws JMSException {
1773 return connection.syncSendPacket(command);
1774 }
1775
1776 public long getNextDeliveryId() {
1777 return deliveryIdGenerator.getNextSequenceId();
1778 }
1779
1780 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1781
1782 List<MessageDispatch> c = unconsumedMessages.removeAll();
1783 for (MessageDispatch md : c) {
1784 this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1785 }
1786 Collections.reverse(c);
1787
1788 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1789 MessageDispatch md = iter.next();
1790 executor.executeFirst(md);
1791 }
1792
1793 }
1794
1795 public boolean isRunning() {
1796 return started.get();
1797 }
1798
1799 public boolean isAsyncDispatch() {
1800 return asyncDispatch;
1801 }
1802
1803 public void setAsyncDispatch(boolean asyncDispatch) {
1804 this.asyncDispatch = asyncDispatch;
1805 }
1806
1807 /**
1808 * @return Returns the sessionAsyncDispatch.
1809 */
1810 public boolean isSessionAsyncDispatch() {
1811 return sessionAsyncDispatch;
1812 }
1813
1814 /**
1815 * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1816 */
1817 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1818 this.sessionAsyncDispatch = sessionAsyncDispatch;
1819 }
1820
1821 public MessageTransformer getTransformer() {
1822 return transformer;
1823 }
1824
1825 public ActiveMQConnection getConnection() {
1826 return connection;
1827 }
1828
1829 /**
1830 * Sets the transformer used to transform messages before they are sent on
1831 * to the JMS bus or when they are received from the bus but before they are
1832 * delivered to the JMS client
1833 */
1834 public void setTransformer(MessageTransformer transformer) {
1835 this.transformer = transformer;
1836 }
1837
1838 public BlobTransferPolicy getBlobTransferPolicy() {
1839 return blobTransferPolicy;
1840 }
1841
1842 /**
1843 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1844 * OBjects) are transferred from producers to brokers to consumers
1845 */
1846 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1847 this.blobTransferPolicy = blobTransferPolicy;
1848 }
1849
1850 public List getUnconsumedMessages() {
1851 return executor.getUnconsumedMessages();
1852 }
1853
1854 public String toString() {
1855 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1856 }
1857
1858 public void checkMessageListener() throws JMSException {
1859 if (messageListener != null) {
1860 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1861 }
1862 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
1863 ActiveMQMessageConsumer consumer = i.next();
1864 if (consumer.getMessageListener() != null) {
1865 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1866 }
1867 }
1868 }
1869
1870 protected void setOptimizeAcknowledge(boolean value) {
1871 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1872 ActiveMQMessageConsumer c = iter.next();
1873 c.setOptimizeAcknowledge(value);
1874 }
1875 }
1876
1877 protected void setPrefetchSize(ConsumerId id, int prefetch) {
1878 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1879 ActiveMQMessageConsumer c = iter.next();
1880 if (c.getConsumerId().equals(id)) {
1881 c.setPrefetchSize(prefetch);
1882 break;
1883 }
1884 }
1885 }
1886
1887 protected void close(ConsumerId id) {
1888 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1889 ActiveMQMessageConsumer c = iter.next();
1890 if (c.getConsumerId().equals(id)) {
1891 try {
1892 c.close();
1893 } catch (JMSException e) {
1894 LOG.warn("Exception closing consumer", e);
1895 }
1896 LOG.warn("Closed consumer on Command");
1897 break;
1898 }
1899 }
1900 }
1901
1902 public boolean isInUse(ActiveMQTempDestination destination) {
1903 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1904 ActiveMQMessageConsumer c = iter.next();
1905 if (c.isInUse(destination)) {
1906 return true;
1907 }
1908 }
1909 return false;
1910 }
1911
1912 protected void sendAck(MessageAck ack) throws JMSException {
1913 sendAck(ack,false);
1914 }
1915
1916 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
1917 if (lazy || connection.isSendAcksAsync() || isTransacted()) {
1918 asyncSendPacket(ack);
1919 } else {
1920 syncSendPacket(ack);
1921 }
1922 }
1923
1924 }