001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.File;
020 import java.io.InputStream;
021 import java.io.Serializable;
022 import java.net.URL;
023 import java.util.Collections;
024 import java.util.Iterator;
025 import java.util.List;
026 import java.util.concurrent.CopyOnWriteArrayList;
027 import java.util.concurrent.ThreadPoolExecutor;
028 import java.util.concurrent.atomic.AtomicBoolean;
029
030 import javax.jms.BytesMessage;
031 import javax.jms.Destination;
032 import javax.jms.IllegalStateException;
033 import javax.jms.InvalidDestinationException;
034 import javax.jms.InvalidSelectorException;
035 import javax.jms.JMSException;
036 import javax.jms.MapMessage;
037 import javax.jms.Message;
038 import javax.jms.MessageConsumer;
039 import javax.jms.MessageListener;
040 import javax.jms.MessageProducer;
041 import javax.jms.ObjectMessage;
042 import javax.jms.Queue;
043 import javax.jms.QueueBrowser;
044 import javax.jms.QueueReceiver;
045 import javax.jms.QueueSender;
046 import javax.jms.QueueSession;
047 import javax.jms.Session;
048 import javax.jms.StreamMessage;
049 import javax.jms.TemporaryQueue;
050 import javax.jms.TemporaryTopic;
051 import javax.jms.TextMessage;
052 import javax.jms.Topic;
053 import javax.jms.TopicPublisher;
054 import javax.jms.TopicSession;
055 import javax.jms.TopicSubscriber;
056 import javax.jms.TransactionRolledBackException;
057
058 import org.apache.activemq.blob.BlobDownloader;
059 import org.apache.activemq.blob.BlobTransferPolicy;
060 import org.apache.activemq.blob.BlobUploader;
061 import org.apache.activemq.command.ActiveMQBlobMessage;
062 import org.apache.activemq.command.ActiveMQBytesMessage;
063 import org.apache.activemq.command.ActiveMQDestination;
064 import org.apache.activemq.command.ActiveMQMapMessage;
065 import org.apache.activemq.command.ActiveMQMessage;
066 import org.apache.activemq.command.ActiveMQObjectMessage;
067 import org.apache.activemq.command.ActiveMQQueue;
068 import org.apache.activemq.command.ActiveMQStreamMessage;
069 import org.apache.activemq.command.ActiveMQTempDestination;
070 import org.apache.activemq.command.ActiveMQTempQueue;
071 import org.apache.activemq.command.ActiveMQTempTopic;
072 import org.apache.activemq.command.ActiveMQTextMessage;
073 import org.apache.activemq.command.ActiveMQTopic;
074 import org.apache.activemq.command.Command;
075 import org.apache.activemq.command.ConsumerId;
076 import org.apache.activemq.command.MessageAck;
077 import org.apache.activemq.command.MessageDispatch;
078 import org.apache.activemq.command.MessageId;
079 import org.apache.activemq.command.ProducerId;
080 import org.apache.activemq.command.RemoveInfo;
081 import org.apache.activemq.command.Response;
082 import org.apache.activemq.command.SessionId;
083 import org.apache.activemq.command.SessionInfo;
084 import org.apache.activemq.command.TransactionId;
085 import org.apache.activemq.management.JMSSessionStatsImpl;
086 import org.apache.activemq.management.StatsCapable;
087 import org.apache.activemq.management.StatsImpl;
088 import org.apache.activemq.thread.Scheduler;
089 import org.apache.activemq.transaction.Synchronization;
090 import org.apache.activemq.usage.MemoryUsage;
091 import org.apache.activemq.util.Callback;
092 import org.apache.activemq.util.JMSExceptionSupport;
093 import org.apache.activemq.util.LongSequenceGenerator;
094 import org.slf4j.Logger;
095 import org.slf4j.LoggerFactory;
096
097 /**
098 * <P>
099 * A <CODE>Session</CODE> object is a single-threaded context for producing
100 * and consuming messages. Although it may allocate provider resources outside
101 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
102 * <P>
103 * A session serves several purposes:
104 * <UL>
105 * <LI>It is a factory for its message producers and consumers.
106 * <LI>It supplies provider-optimized message factories.
107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
108 * <CODE>TemporaryQueues</CODE>.
109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
110 * objects for those clients that need to dynamically manipulate
111 * provider-specific destination names.
112 * <LI>It supports a single series of transactions that combine work spanning
113 * its producers and consumers into atomic units.
114 * <LI>It defines a serial order for the messages it consumes and the messages
115 * it produces.
116 * <LI>It retains messages it consumes until they have been acknowledged.
117 * <LI>It serializes execution of message listeners registered with its message
118 * consumers.
119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
120 * </UL>
121 * <P>
122 * A session can create and service multiple message producers and consumers.
123 * <P>
124 * One typical use is to have a thread block on a synchronous
125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
127 * <P>
128 * If a client desires to have one thread produce messages while others consume
129 * them, the client should use a separate session for its producing thread.
130 * <P>
131 * Once a connection has been started, any session with one or more registered
132 * message listeners is dedicated to the thread of control that delivers
133 * messages to it. It is erroneous for client code to use this session or any of
134 * its constituent objects from another thread of control. The only exception to
135 * this rule is the use of the session or connection <CODE>close</CODE>
136 * method.
137 * <P>
138 * It should be easy for most clients to partition their work naturally into
139 * sessions. This model allows clients to start simply and incrementally add
140 * message processing complexity as their need for concurrency grows.
141 * <P>
142 * The <CODE>close</CODE> method is the only session method that can be called
143 * while some other session method is being executed in another thread.
144 * <P>
145 * A session may be specified as transacted. Each transacted session supports a
146 * single series of transactions. Each transaction groups a set of message sends
147 * and a set of message receives into an atomic unit of work. In effect,
148 * transactions organize a session's input message stream and output message
149 * stream into series of atomic units. When a transaction commits, its atomic
150 * unit of input is acknowledged and its associated atomic unit of output is
151 * sent. If a transaction rollback is done, the transaction's sent messages are
152 * destroyed and the session's input is automatically recovered.
153 * <P>
154 * The content of a transaction's input and output units is simply those
155 * messages that have been produced and consumed within the session's current
156 * transaction.
157 * <P>
158 * A transaction is completed using either its session's <CODE>commit</CODE>
159 * method or its session's <CODE>rollback </CODE> method. The completion of a
160 * session's current transaction automatically begins the next. The result is
161 * that a transacted session always has a current transaction within which its
162 * work is done.
163 * <P>
164 * The Java Transaction Service (JTS) or some other transaction monitor may be
165 * used to combine a session's transaction with transactions on other resources
166 * (databases, other JMS sessions, etc.). Since Java distributed transactions
167 * are controlled via the Java Transaction API (JTA), use of the session's
168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
169 * prohibited.
170 * <P>
171 * The JMS API does not require support for JTA; however, it does define how a
172 * provider supplies this support.
173 * <P>
174 * Although it is also possible for a JMS client to handle distributed
175 * transactions directly, it is unlikely that many JMS clients will do this.
176 * Support for JTA in the JMS API is targeted at systems vendors who will be
177 * integrating the JMS API into their application server products.
178 *
179 *
180 * @see javax.jms.Session
181 * @see javax.jms.QueueSession
182 * @see javax.jms.TopicSession
183 * @see javax.jms.XASession
184 */
185 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
186
187 /**
188 * Only acknowledge an individual message - using message.acknowledge()
189 * as opposed to CLIENT_ACKNOWLEDGE which
190 * acknowledges all messages consumed by a session at when acknowledge()
191 * is called
192 */
193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
195
196 public static interface DeliveryListener {
197 void beforeDelivery(ActiveMQSession session, Message msg);
198
199 void afterDelivery(ActiveMQSession session, Message msg);
200 }
201
202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
203 private final ThreadPoolExecutor connectionExecutor;
204
205 protected int acknowledgementMode;
206 protected final ActiveMQConnection connection;
207 protected final SessionInfo info;
208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
211 protected final ActiveMQSessionExecutor executor;
212 protected final AtomicBoolean started = new AtomicBoolean(false);
213
214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
216
217 protected boolean closed;
218 private volatile boolean synchronizationRegistered;
219 protected boolean asyncDispatch;
220 protected boolean sessionAsyncDispatch;
221 protected final boolean debug;
222 protected Object sendMutex = new Object();
223
224 private MessageListener messageListener;
225 private final JMSSessionStatsImpl stats;
226 private TransactionContext transactionContext;
227 private DeliveryListener deliveryListener;
228 private MessageTransformer transformer;
229 private BlobTransferPolicy blobTransferPolicy;
230 private long lastDeliveredSequenceId;
231
232 /**
233 * Construct the Session
234 *
235 * @param connection
236 * @param sessionId
237 * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
238 * Session.SESSION_TRANSACTED
239 * @param asyncDispatch
240 * @param sessionAsyncDispatch
241 * @throws JMSException on internal error
242 */
243 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
244 this.debug = LOG.isDebugEnabled();
245 this.connection = connection;
246 this.acknowledgementMode = acknowledgeMode;
247 this.asyncDispatch = asyncDispatch;
248 this.sessionAsyncDispatch = sessionAsyncDispatch;
249 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
250 setTransactionContext(new TransactionContext(connection));
251 stats = new JMSSessionStatsImpl(producers, consumers);
252 this.connection.asyncSendPacket(info);
253 setTransformer(connection.getTransformer());
254 setBlobTransferPolicy(connection.getBlobTransferPolicy());
255 this.connectionExecutor=connection.getExecutor();
256 this.executor = new ActiveMQSessionExecutor(this);
257 connection.addSession(this);
258 if (connection.isStarted()) {
259 start();
260 }
261
262 }
263
264 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
265 this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
266 }
267
268 /**
269 * Sets the transaction context of the session.
270 *
271 * @param transactionContext - provides the means to control a JMS
272 * transaction.
273 */
274 public void setTransactionContext(TransactionContext transactionContext) {
275 this.transactionContext = transactionContext;
276 }
277
278 /**
279 * Returns the transaction context of the session.
280 *
281 * @return transactionContext - session's transaction context.
282 */
283 public TransactionContext getTransactionContext() {
284 return transactionContext;
285 }
286
287 /*
288 * (non-Javadoc)
289 *
290 * @see org.apache.activemq.management.StatsCapable#getStats()
291 */
292 public StatsImpl getStats() {
293 return stats;
294 }
295
296 /**
297 * Returns the session's statistics.
298 *
299 * @return stats - session's statistics.
300 */
301 public JMSSessionStatsImpl getSessionStats() {
302 return stats;
303 }
304
305 /**
306 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
307 * object is used to send a message containing a stream of uninterpreted
308 * bytes.
309 *
310 * @return the an ActiveMQBytesMessage
311 * @throws JMSException if the JMS provider fails to create this message due
312 * to some internal error.
313 */
314 public BytesMessage createBytesMessage() throws JMSException {
315 ActiveMQBytesMessage message = new ActiveMQBytesMessage();
316 configureMessage(message);
317 return message;
318 }
319
320 /**
321 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
322 * object is used to send a self-defining set of name-value pairs, where
323 * names are <CODE>String</CODE> objects and values are primitive values
324 * in the Java programming language.
325 *
326 * @return an ActiveMQMapMessage
327 * @throws JMSException if the JMS provider fails to create this message due
328 * to some internal error.
329 */
330 public MapMessage createMapMessage() throws JMSException {
331 ActiveMQMapMessage message = new ActiveMQMapMessage();
332 configureMessage(message);
333 return message;
334 }
335
336 /**
337 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
338 * interface is the root interface of all JMS messages. A
339 * <CODE>Message</CODE> object holds all the standard message header
340 * information. It can be sent when a message containing only header
341 * information is sufficient.
342 *
343 * @return an ActiveMQMessage
344 * @throws JMSException if the JMS provider fails to create this message due
345 * to some internal error.
346 */
347 public Message createMessage() throws JMSException {
348 ActiveMQMessage message = new ActiveMQMessage();
349 configureMessage(message);
350 return message;
351 }
352
353 /**
354 * Creates an <CODE>ObjectMessage</CODE> object. An
355 * <CODE>ObjectMessage</CODE> object is used to send a message that
356 * contains a serializable Java object.
357 *
358 * @return an ActiveMQObjectMessage
359 * @throws JMSException if the JMS provider fails to create this message due
360 * to some internal error.
361 */
362 public ObjectMessage createObjectMessage() throws JMSException {
363 ActiveMQObjectMessage message = new ActiveMQObjectMessage();
364 configureMessage(message);
365 return message;
366 }
367
368 /**
369 * Creates an initialized <CODE>ObjectMessage</CODE> object. An
370 * <CODE>ObjectMessage</CODE> object is used to send a message that
371 * contains a serializable Java object.
372 *
373 * @param object the object to use to initialize this message
374 * @return an ActiveMQObjectMessage
375 * @throws JMSException if the JMS provider fails to create this message due
376 * to some internal error.
377 */
378 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
379 ActiveMQObjectMessage message = new ActiveMQObjectMessage();
380 configureMessage(message);
381 message.setObject(object);
382 return message;
383 }
384
385 /**
386 * Creates a <CODE>StreamMessage</CODE> object. A
387 * <CODE>StreamMessage</CODE> object is used to send a self-defining
388 * stream of primitive values in the Java programming language.
389 *
390 * @return an ActiveMQStreamMessage
391 * @throws JMSException if the JMS provider fails to create this message due
392 * to some internal error.
393 */
394 public StreamMessage createStreamMessage() throws JMSException {
395 ActiveMQStreamMessage message = new ActiveMQStreamMessage();
396 configureMessage(message);
397 return message;
398 }
399
400 /**
401 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
402 * object is used to send a message containing a <CODE>String</CODE>
403 * object.
404 *
405 * @return an ActiveMQTextMessage
406 * @throws JMSException if the JMS provider fails to create this message due
407 * to some internal error.
408 */
409 public TextMessage createTextMessage() throws JMSException {
410 ActiveMQTextMessage message = new ActiveMQTextMessage();
411 configureMessage(message);
412 return message;
413 }
414
415 /**
416 * Creates an initialized <CODE>TextMessage</CODE> object. A
417 * <CODE>TextMessage</CODE> object is used to send a message containing a
418 * <CODE>String</CODE>.
419 *
420 * @param text the string used to initialize this message
421 * @return an ActiveMQTextMessage
422 * @throws JMSException if the JMS provider fails to create this message due
423 * to some internal error.
424 */
425 public TextMessage createTextMessage(String text) throws JMSException {
426 ActiveMQTextMessage message = new ActiveMQTextMessage();
427 message.setText(text);
428 configureMessage(message);
429 return message;
430 }
431
432 /**
433 * Creates an initialized <CODE>BlobMessage</CODE> object. A
434 * <CODE>BlobMessage</CODE> object is used to send a message containing a
435 * <CODE>URL</CODE> which points to some network addressible BLOB.
436 *
437 * @param url the network addressable URL used to pass directly to the
438 * consumer
439 * @return a BlobMessage
440 * @throws JMSException if the JMS provider fails to create this message due
441 * to some internal error.
442 */
443 public BlobMessage createBlobMessage(URL url) throws JMSException {
444 return createBlobMessage(url, false);
445 }
446
447 /**
448 * Creates an initialized <CODE>BlobMessage</CODE> object. A
449 * <CODE>BlobMessage</CODE> object is used to send a message containing a
450 * <CODE>URL</CODE> which points to some network addressible BLOB.
451 *
452 * @param url the network addressable URL used to pass directly to the
453 * consumer
454 * @param deletedByBroker indicates whether or not the resource is deleted
455 * by the broker when the message is acknowledged
456 * @return a BlobMessage
457 * @throws JMSException if the JMS provider fails to create this message due
458 * to some internal error.
459 */
460 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
461 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
462 configureMessage(message);
463 message.setURL(url);
464 message.setDeletedByBroker(deletedByBroker);
465 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
466 return message;
467 }
468
469 /**
470 * Creates an initialized <CODE>BlobMessage</CODE> object. A
471 * <CODE>BlobMessage</CODE> object is used to send a message containing
472 * the <CODE>File</CODE> content. Before the message is sent the file
473 * conent will be uploaded to the broker or some other remote repository
474 * depending on the {@link #getBlobTransferPolicy()}.
475 *
476 * @param file the file to be uploaded to some remote repo (or the broker)
477 * depending on the strategy
478 * @return a BlobMessage
479 * @throws JMSException if the JMS provider fails to create this message due
480 * to some internal error.
481 */
482 public BlobMessage createBlobMessage(File file) throws JMSException {
483 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
484 configureMessage(message);
485 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
486 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
487 message.setDeletedByBroker(true);
488 message.setName(file.getName());
489 return message;
490 }
491
492 /**
493 * Creates an initialized <CODE>BlobMessage</CODE> object. A
494 * <CODE>BlobMessage</CODE> object is used to send a message containing
495 * the <CODE>File</CODE> content. Before the message is sent the file
496 * conent will be uploaded to the broker or some other remote repository
497 * depending on the {@link #getBlobTransferPolicy()}.
498 *
499 * @param in the stream to be uploaded to some remote repo (or the broker)
500 * depending on the strategy
501 * @return a BlobMessage
502 * @throws JMSException if the JMS provider fails to create this message due
503 * to some internal error.
504 */
505 public BlobMessage createBlobMessage(InputStream in) throws JMSException {
506 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
507 configureMessage(message);
508 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
509 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
510 message.setDeletedByBroker(true);
511 return message;
512 }
513
514 /**
515 * Indicates whether the session is in transacted mode.
516 *
517 * @return true if the session is in transacted mode
518 * @throws JMSException if there is some internal error.
519 */
520 public boolean getTransacted() throws JMSException {
521 checkClosed();
522 return isTransacted();
523 }
524
525 /**
526 * Returns the acknowledgement mode of the session. The acknowledgement mode
527 * is set at the time that the session is created. If the session is
528 * transacted, the acknowledgement mode is ignored.
529 *
530 * @return If the session is not transacted, returns the current
531 * acknowledgement mode for the session. If the session is
532 * transacted, returns SESSION_TRANSACTED.
533 * @throws JMSException
534 * @see javax.jms.Connection#createSession(boolean,int)
535 * @since 1.1 exception JMSException if there is some internal error.
536 */
537 public int getAcknowledgeMode() throws JMSException {
538 checkClosed();
539 return this.acknowledgementMode;
540 }
541
542 /**
543 * Commits all messages done in this transaction and releases any locks
544 * currently held.
545 *
546 * @throws JMSException if the JMS provider fails to commit the transaction
547 * due to some internal error.
548 * @throws TransactionRolledBackException if the transaction is rolled back
549 * due to some internal error during commit.
550 * @throws javax.jms.IllegalStateException if the method is not called by a
551 * transacted session.
552 */
553 public void commit() throws JMSException {
554 checkClosed();
555 if (!getTransacted()) {
556 throw new javax.jms.IllegalStateException("Not a transacted session");
557 }
558 if (LOG.isDebugEnabled()) {
559 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
560 }
561 transactionContext.commit();
562 }
563
564 /**
565 * Rolls back any messages done in this transaction and releases any locks
566 * currently held.
567 *
568 * @throws JMSException if the JMS provider fails to roll back the
569 * transaction due to some internal error.
570 * @throws javax.jms.IllegalStateException if the method is not called by a
571 * transacted session.
572 */
573 public void rollback() throws JMSException {
574 checkClosed();
575 if (!getTransacted()) {
576 throw new javax.jms.IllegalStateException("Not a transacted session");
577 }
578 if (LOG.isDebugEnabled()) {
579 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId());
580 }
581 transactionContext.rollback();
582 }
583
584 /**
585 * Closes the session.
586 * <P>
587 * Since a provider may allocate some resources on behalf of a session
588 * outside the JVM, clients should close the resources when they are not
589 * needed. Relying on garbage collection to eventually reclaim these
590 * resources may not be timely enough.
591 * <P>
592 * There is no need to close the producers and consumers of a closed
593 * session.
594 * <P>
595 * This call will block until a <CODE>receive</CODE> call or message
596 * listener in progress has completed. A blocked message consumer
597 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
598 * is closed.
599 * <P>
600 * Closing a transacted session must roll back the transaction in progress.
601 * <P>
602 * This method is the only <CODE>Session</CODE> method that can be called
603 * concurrently.
604 * <P>
605 * Invoking any other <CODE>Session</CODE> method on a closed session must
606 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
607 * closed session must <I>not </I> throw an exception.
608 *
609 * @throws JMSException if the JMS provider fails to close the session due
610 * to some internal error.
611 */
612 public void close() throws JMSException {
613 if (!closed) {
614 if (getTransactionContext().isInXATransaction()) {
615 if (!synchronizationRegistered) {
616 synchronizationRegistered = true;
617 getTransactionContext().addSynchronization(new Synchronization() {
618
619 @Override
620 public void afterCommit() throws Exception {
621 doClose();
622 synchronizationRegistered = false;
623 }
624
625 @Override
626 public void afterRollback() throws Exception {
627 doClose();
628 synchronizationRegistered = false;
629 }
630 });
631 }
632
633 } else {
634 doClose();
635 }
636 }
637 }
638
639 private void doClose() throws JMSException {
640 boolean interrupted = Thread.interrupted();
641 dispose();
642 RemoveInfo removeCommand = info.createRemoveCommand();
643 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
644 connection.asyncSendPacket(removeCommand);
645 if (interrupted) {
646 Thread.currentThread().interrupt();
647 }
648 }
649
650 void clearMessagesInProgress() {
651 executor.clearMessagesInProgress();
652 // we are called from inside the transport reconnection logic
653 // which involves us clearing all the connections' consumers
654 // dispatch and delivered lists. So rather than trying to
655 // grab a mutex (which could be already owned by the message
656 // listener calling the send or an ack) we allow it to complete in
657 // a separate thread via the scheduler and notify us via
658 // connection.transportInterruptionProcessingComplete()
659 //
660 for (final ActiveMQMessageConsumer consumer : consumers) {
661 consumer.inProgressClearRequired();
662 try {
663 connection.getScheduler().executeAfterDelay(new Runnable() {
664 public void run() {
665 consumer.clearMessagesInProgress();
666 }}, 0l);
667 } catch (JMSException e) {
668 connection.onClientInternalException(e);
669 }
670 }
671 }
672
673 void deliverAcks() {
674 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
675 ActiveMQMessageConsumer consumer = iter.next();
676 consumer.deliverAcks();
677 }
678 }
679
680 public synchronized void dispose() throws JMSException {
681 if (!closed) {
682
683 try {
684 executor.stop();
685
686 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
687 ActiveMQMessageConsumer consumer = iter.next();
688 consumer.setFailureError(connection.getFirstFailureError());
689 consumer.dispose();
690 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
691 }
692 consumers.clear();
693
694 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
695 ActiveMQMessageProducer producer = iter.next();
696 producer.dispose();
697 }
698 producers.clear();
699
700 try {
701 if (getTransactionContext().isInLocalTransaction()) {
702 rollback();
703 }
704 } catch (JMSException e) {
705 }
706
707 } finally {
708 connection.removeSession(this);
709 this.transactionContext = null;
710 closed = true;
711 }
712 }
713 }
714
715 /**
716 * Checks that the session is not closed then configures the message
717 */
718 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
719 checkClosed();
720 message.setConnection(connection);
721 }
722
723 /**
724 * Check if the session is closed. It is used for ensuring that the session
725 * is open before performing various operations.
726 *
727 * @throws IllegalStateException if the Session is closed
728 */
729 protected void checkClosed() throws IllegalStateException {
730 if (closed) {
731 throw new IllegalStateException("The Session is closed");
732 }
733 }
734
735 /**
736 * Checks if the session is closed.
737 *
738 * @return true if the session is closed, false otherwise.
739 */
740 public boolean isClosed() {
741 return closed;
742 }
743
744 /**
745 * Stops message delivery in this session, and restarts message delivery
746 * with the oldest unacknowledged message.
747 * <P>
748 * All consumers deliver messages in a serial order. Acknowledging a
749 * received message automatically acknowledges all messages that have been
750 * delivered to the client.
751 * <P>
752 * Restarting a session causes it to take the following actions:
753 * <UL>
754 * <LI>Stop message delivery
755 * <LI>Mark all messages that might have been delivered but not
756 * acknowledged as "redelivered"
757 * <LI>Restart the delivery sequence including all unacknowledged messages
758 * that had been previously delivered. Redelivered messages do not have to
759 * be delivered in exactly their original delivery order.
760 * </UL>
761 *
762 * @throws JMSException if the JMS provider fails to stop and restart
763 * message delivery due to some internal error.
764 * @throws IllegalStateException if the method is called by a transacted
765 * session.
766 */
767 public void recover() throws JMSException {
768
769 checkClosed();
770 if (getTransacted()) {
771 throw new IllegalStateException("This session is transacted");
772 }
773
774 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
775 ActiveMQMessageConsumer c = iter.next();
776 c.rollback();
777 }
778
779 }
780
781 /**
782 * Returns the session's distinguished message listener (optional).
783 *
784 * @return the message listener associated with this session
785 * @throws JMSException if the JMS provider fails to get the message
786 * listener due to an internal error.
787 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
788 * @see javax.jms.ServerSessionPool
789 * @see javax.jms.ServerSession
790 */
791 public MessageListener getMessageListener() throws JMSException {
792 checkClosed();
793 return this.messageListener;
794 }
795
796 /**
797 * Sets the session's distinguished message listener (optional).
798 * <P>
799 * When the distinguished message listener is set, no other form of message
800 * receipt in the session can be used; however, all forms of sending
801 * messages are still supported.
802 * <P>
803 * If this session has been closed, then an {@link IllegalStateException} is
804 * thrown, if trying to set a new listener. However setting the listener
805 * to <tt>null</tt> is allowed, to clear the listener, even if this session
806 * has been closed prior.
807 * <P>
808 * This is an expert facility not used by regular JMS clients.
809 *
810 * @param listener the message listener to associate with this session
811 * @throws JMSException if the JMS provider fails to set the message
812 * listener due to an internal error.
813 * @see javax.jms.Session#getMessageListener()
814 * @see javax.jms.ServerSessionPool
815 * @see javax.jms.ServerSession
816 */
817 public void setMessageListener(MessageListener listener) throws JMSException {
818 // only check for closed if we set a new listener, as we allow to clear
819 // the listener, such as when an application is shutting down, and is
820 // no longer using a message listener on this session
821 if (listener != null) {
822 checkClosed();
823 }
824 this.messageListener = listener;
825
826 if (listener != null) {
827 executor.setDispatchedBySessionPool(true);
828 }
829 }
830
831 /**
832 * Optional operation, intended to be used only by Application Servers, not
833 * by ordinary JMS clients.
834 *
835 * @see javax.jms.ServerSession
836 */
837 public void run() {
838 MessageDispatch messageDispatch;
839 while ((messageDispatch = executor.dequeueNoWait()) != null) {
840 final MessageDispatch md = messageDispatch;
841 ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
842 if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
843 // TODO: Ack it without delivery to client
844 continue;
845 }
846
847 if (isClientAcknowledge()||isIndividualAcknowledge()) {
848 message.setAcknowledgeCallback(new Callback() {
849 public void execute() throws Exception {
850 }
851 });
852 }
853
854 if (deliveryListener != null) {
855 deliveryListener.beforeDelivery(this, message);
856 }
857
858 md.setDeliverySequenceId(getNextDeliveryId());
859
860 try {
861 messageListener.onMessage(message);
862 } catch (RuntimeException e) {
863 LOG.error("error dispatching message: ", e);
864 // A problem while invoking the MessageListener does not
865 // in general indicate a problem with the connection to the broker, i.e.
866 // it will usually be sufficient to let the afterDelivery() method either
867 // commit or roll back in order to deal with the exception.
868 // However, we notify any registered client internal exception listener
869 // of the problem.
870 connection.onClientInternalException(e);
871 }
872
873 try {
874 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
875 ack.setFirstMessageId(md.getMessage().getMessageId());
876 doStartTransaction();
877 ack.setTransactionId(getTransactionContext().getTransactionId());
878 if (ack.getTransactionId() != null) {
879 getTransactionContext().addSynchronization(new Synchronization() {
880
881 @Override
882 public void afterRollback() throws Exception {
883 md.getMessage().onMessageRolledBack();
884 // ensure we don't filter this as a duplicate
885 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
886 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
887 int redeliveryCounter = md.getMessage().getRedeliveryCounter();
888 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
889 && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
890 // We need to NACK the messages so that they get
891 // sent to the
892 // DLQ.
893 // Acknowledge the last message.
894 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
895 ack.setFirstMessageId(md.getMessage().getMessageId());
896 asyncSendPacket(ack);
897 } else {
898
899 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
900 ack.setFirstMessageId(md.getMessage().getMessageId());
901 asyncSendPacket(ack);
902
903 // Figure out how long we should wait to resend
904 // this message.
905 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
906 for (int i = 0; i < redeliveryCounter; i++) {
907 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
908 }
909 connection.getScheduler().executeAfterDelay(new Runnable() {
910
911 public void run() {
912 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
913 }
914 }, redeliveryDelay);
915 }
916 }
917 });
918 }
919 asyncSendPacket(ack);
920 } catch (Throwable e) {
921 connection.onClientInternalException(e);
922 }
923
924 if (deliveryListener != null) {
925 deliveryListener.afterDelivery(this, message);
926 }
927 }
928 }
929
930 /**
931 * Creates a <CODE>MessageProducer</CODE> to send messages to the
932 * specified destination.
933 * <P>
934 * A client uses a <CODE>MessageProducer</CODE> object to send messages to
935 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
936 * inherit from <CODE>Destination</CODE>, they can be used in the
937 * destination parameter to create a <CODE>MessageProducer</CODE> object.
938 *
939 * @param destination the <CODE>Destination</CODE> to send to, or null if
940 * this is a producer which does not have a specified
941 * destination.
942 * @return the MessageProducer
943 * @throws JMSException if the session fails to create a MessageProducer due
944 * to some internal error.
945 * @throws InvalidDestinationException if an invalid destination is
946 * specified.
947 * @since 1.1
948 */
949 public MessageProducer createProducer(Destination destination) throws JMSException {
950 checkClosed();
951 if (destination instanceof CustomDestination) {
952 CustomDestination customDestination = (CustomDestination)destination;
953 return customDestination.createProducer(this);
954 }
955 int timeSendOut = connection.getSendTimeout();
956 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
957 }
958
959 /**
960 * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
961 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
962 * <CODE>Destination</CODE>, they can be used in the destination
963 * parameter to create a <CODE>MessageConsumer</CODE>.
964 *
965 * @param destination the <CODE>Destination</CODE> to access.
966 * @return the MessageConsumer
967 * @throws JMSException if the session fails to create a consumer due to
968 * some internal error.
969 * @throws InvalidDestinationException if an invalid destination is
970 * specified.
971 * @since 1.1
972 */
973 public MessageConsumer createConsumer(Destination destination) throws JMSException {
974 return createConsumer(destination, (String) null);
975 }
976
977 /**
978 * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
979 * using a message selector. Since <CODE> Queue</CODE> and
980 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
981 * can be used in the destination parameter to create a
982 * <CODE>MessageConsumer</CODE>.
983 * <P>
984 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
985 * that have been sent to a destination.
986 *
987 * @param destination the <CODE>Destination</CODE> to access
988 * @param messageSelector only messages with properties matching the message
989 * selector expression are delivered. A value of null or an
990 * empty string indicates that there is no message selector
991 * for the message consumer.
992 * @return the MessageConsumer
993 * @throws JMSException if the session fails to create a MessageConsumer due
994 * to some internal error.
995 * @throws InvalidDestinationException if an invalid destination is
996 * specified.
997 * @throws InvalidSelectorException if the message selector is invalid.
998 * @since 1.1
999 */
1000 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
1001 return createConsumer(destination, messageSelector, false);
1002 }
1003
1004 /**
1005 * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1006 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1007 * <CODE>Destination</CODE>, they can be used in the destination
1008 * parameter to create a <CODE>MessageConsumer</CODE>.
1009 *
1010 * @param destination the <CODE>Destination</CODE> to access.
1011 * @param messageListener the listener to use for async consumption of messages
1012 * @return the MessageConsumer
1013 * @throws JMSException if the session fails to create a consumer due to
1014 * some internal error.
1015 * @throws InvalidDestinationException if an invalid destination is
1016 * specified.
1017 * @since 1.1
1018 */
1019 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1020 return createConsumer(destination, null, messageListener);
1021 }
1022
1023 /**
1024 * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1025 * using a message selector. Since <CODE> Queue</CODE> and
1026 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1027 * can be used in the destination parameter to create a
1028 * <CODE>MessageConsumer</CODE>.
1029 * <P>
1030 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1031 * that have been sent to a destination.
1032 *
1033 * @param destination the <CODE>Destination</CODE> to access
1034 * @param messageSelector only messages with properties matching the message
1035 * selector expression are delivered. A value of null or an
1036 * empty string indicates that there is no message selector
1037 * for the message consumer.
1038 * @param messageListener the listener to use for async consumption of messages
1039 * @return the MessageConsumer
1040 * @throws JMSException if the session fails to create a MessageConsumer due
1041 * to some internal error.
1042 * @throws InvalidDestinationException if an invalid destination is
1043 * specified.
1044 * @throws InvalidSelectorException if the message selector is invalid.
1045 * @since 1.1
1046 */
1047 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1048 return createConsumer(destination, messageSelector, false, messageListener);
1049 }
1050
1051 /**
1052 * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1053 * using a message selector. This method can specify whether messages
1054 * published by its own connection should be delivered to it, if the
1055 * destination is a topic.
1056 * <P>
1057 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1058 * <CODE>Destination</CODE>, they can be used in the destination
1059 * parameter to create a <CODE>MessageConsumer</CODE>.
1060 * <P>
1061 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1062 * that have been published to a destination.
1063 * <P>
1064 * In some cases, a connection may both publish and subscribe to a topic.
1065 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1066 * inhibit the delivery of messages published by its own connection. The
1067 * default value for this attribute is False. The <CODE>noLocal</CODE>
1068 * value must be supported by destinations that are topics.
1069 *
1070 * @param destination the <CODE>Destination</CODE> to access
1071 * @param messageSelector only messages with properties matching the message
1072 * selector expression are delivered. A value of null or an
1073 * empty string indicates that there is no message selector
1074 * for the message consumer.
1075 * @param noLocal - if true, and the destination is a topic, inhibits the
1076 * delivery of messages published by its own connection. The
1077 * behavior for <CODE>NoLocal</CODE> is not specified if
1078 * the destination is a queue.
1079 * @return the MessageConsumer
1080 * @throws JMSException if the session fails to create a MessageConsumer due
1081 * to some internal error.
1082 * @throws InvalidDestinationException if an invalid destination is
1083 * specified.
1084 * @throws InvalidSelectorException if the message selector is invalid.
1085 * @since 1.1
1086 */
1087 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1088 return createConsumer(destination, messageSelector, noLocal, null);
1089 }
1090
1091 /**
1092 * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1093 * using a message selector. This method can specify whether messages
1094 * published by its own connection should be delivered to it, if the
1095 * destination is a topic.
1096 * <P>
1097 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1098 * <CODE>Destination</CODE>, they can be used in the destination
1099 * parameter to create a <CODE>MessageConsumer</CODE>.
1100 * <P>
1101 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1102 * that have been published to a destination.
1103 * <P>
1104 * In some cases, a connection may both publish and subscribe to a topic.
1105 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1106 * inhibit the delivery of messages published by its own connection. The
1107 * default value for this attribute is False. The <CODE>noLocal</CODE>
1108 * value must be supported by destinations that are topics.
1109 *
1110 * @param destination the <CODE>Destination</CODE> to access
1111 * @param messageSelector only messages with properties matching the message
1112 * selector expression are delivered. A value of null or an
1113 * empty string indicates that there is no message selector
1114 * for the message consumer.
1115 * @param noLocal - if true, and the destination is a topic, inhibits the
1116 * delivery of messages published by its own connection. The
1117 * behavior for <CODE>NoLocal</CODE> is not specified if
1118 * the destination is a queue.
1119 * @param messageListener the listener to use for async consumption of messages
1120 * @return the MessageConsumer
1121 * @throws JMSException if the session fails to create a MessageConsumer due
1122 * to some internal error.
1123 * @throws InvalidDestinationException if an invalid destination is
1124 * specified.
1125 * @throws InvalidSelectorException if the message selector is invalid.
1126 * @since 1.1
1127 */
1128 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1129 checkClosed();
1130
1131 if (destination instanceof CustomDestination) {
1132 CustomDestination customDestination = (CustomDestination)destination;
1133 return customDestination.createConsumer(this, messageSelector, noLocal);
1134 }
1135
1136 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1137 int prefetch = 0;
1138 if (destination instanceof Topic) {
1139 prefetch = prefetchPolicy.getTopicPrefetch();
1140 } else {
1141 prefetch = prefetchPolicy.getQueuePrefetch();
1142 }
1143 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1144 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1145 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1146 }
1147
1148 /**
1149 * Creates a queue identity given a <CODE>Queue</CODE> name.
1150 * <P>
1151 * This facility is provided for the rare cases where clients need to
1152 * dynamically manipulate queue identity. It allows the creation of a queue
1153 * identity with a provider-specific name. Clients that depend on this
1154 * ability are not portable.
1155 * <P>
1156 * Note that this method is not for creating the physical queue. The
1157 * physical creation of queues is an administrative task and is not to be
1158 * initiated by the JMS API. The one exception is the creation of temporary
1159 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1160 * method.
1161 *
1162 * @param queueName the name of this <CODE>Queue</CODE>
1163 * @return a <CODE>Queue</CODE> with the given name
1164 * @throws JMSException if the session fails to create a queue due to some
1165 * internal error.
1166 * @since 1.1
1167 */
1168 public Queue createQueue(String queueName) throws JMSException {
1169 checkClosed();
1170 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1171 return new ActiveMQTempQueue(queueName);
1172 }
1173 return new ActiveMQQueue(queueName);
1174 }
1175
1176 /**
1177 * Creates a topic identity given a <CODE>Topic</CODE> name.
1178 * <P>
1179 * This facility is provided for the rare cases where clients need to
1180 * dynamically manipulate topic identity. This allows the creation of a
1181 * topic identity with a provider-specific name. Clients that depend on this
1182 * ability are not portable.
1183 * <P>
1184 * Note that this method is not for creating the physical topic. The
1185 * physical creation of topics is an administrative task and is not to be
1186 * initiated by the JMS API. The one exception is the creation of temporary
1187 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1188 * method.
1189 *
1190 * @param topicName the name of this <CODE>Topic</CODE>
1191 * @return a <CODE>Topic</CODE> with the given name
1192 * @throws JMSException if the session fails to create a topic due to some
1193 * internal error.
1194 * @since 1.1
1195 */
1196 public Topic createTopic(String topicName) throws JMSException {
1197 checkClosed();
1198 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1199 return new ActiveMQTempTopic(topicName);
1200 }
1201 return new ActiveMQTopic(topicName);
1202 }
1203
1204 /**
1205 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1206 * the specified queue.
1207 *
1208 * @param queue the <CODE>queue</CODE> to access
1209 * @exception InvalidDestinationException if an invalid destination is
1210 * specified
1211 * @since 1.1
1212 */
1213 /**
1214 * Creates a durable subscriber to the specified topic.
1215 * <P>
1216 * If a client needs to receive all the messages published on a topic,
1217 * including the ones published while the subscriber is inactive, it uses a
1218 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1219 * record of this durable subscription and insures that all messages from
1220 * the topic's publishers are retained until they are acknowledged by this
1221 * durable subscriber or they have expired.
1222 * <P>
1223 * Sessions with durable subscribers must always provide the same client
1224 * identifier. In addition, each client must specify a name that uniquely
1225 * identifies (within client identifier) each durable subscription it
1226 * creates. Only one session at a time can have a
1227 * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1228 * <P>
1229 * A client can change an existing durable subscription by creating a
1230 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1231 * and/or message selector. Changing a durable subscriber is equivalent to
1232 * unsubscribing (deleting) the old one and creating a new one.
1233 * <P>
1234 * In some cases, a connection may both publish and subscribe to a topic.
1235 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1236 * inhibit the delivery of messages published by its own connection. The
1237 * default value for this attribute is false.
1238 *
1239 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1240 * @param name the name used to identify this subscription
1241 * @return the TopicSubscriber
1242 * @throws JMSException if the session fails to create a subscriber due to
1243 * some internal error.
1244 * @throws InvalidDestinationException if an invalid topic is specified.
1245 * @since 1.1
1246 */
1247 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1248 checkClosed();
1249 return createDurableSubscriber(topic, name, null, false);
1250 }
1251
1252 /**
1253 * Creates a durable subscriber to the specified topic, using a message
1254 * selector and specifying whether messages published by its own connection
1255 * should be delivered to it.
1256 * <P>
1257 * If a client needs to receive all the messages published on a topic,
1258 * including the ones published while the subscriber is inactive, it uses a
1259 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1260 * record of this durable subscription and insures that all messages from
1261 * the topic's publishers are retained until they are acknowledged by this
1262 * durable subscriber or they have expired.
1263 * <P>
1264 * Sessions with durable subscribers must always provide the same client
1265 * identifier. In addition, each client must specify a name which uniquely
1266 * identifies (within client identifier) each durable subscription it
1267 * creates. Only one session at a time can have a
1268 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1269 * inactive durable subscriber is one that exists but does not currently
1270 * have a message consumer associated with it.
1271 * <P>
1272 * A client can change an existing durable subscription by creating a
1273 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1274 * and/or message selector. Changing a durable subscriber is equivalent to
1275 * unsubscribing (deleting) the old one and creating a new one.
1276 *
1277 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1278 * @param name the name used to identify this subscription
1279 * @param messageSelector only messages with properties matching the message
1280 * selector expression are delivered. A value of null or an
1281 * empty string indicates that there is no message selector
1282 * for the message consumer.
1283 * @param noLocal if set, inhibits the delivery of messages published by its
1284 * own connection
1285 * @return the Queue Browser
1286 * @throws JMSException if the session fails to create a subscriber due to
1287 * some internal error.
1288 * @throws InvalidDestinationException if an invalid topic is specified.
1289 * @throws InvalidSelectorException if the message selector is invalid.
1290 * @since 1.1
1291 */
1292 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1293 checkClosed();
1294
1295 if (isIndividualAcknowledge()) {
1296 throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+
1297 "INDIVIDUAL_ACKNOWLEDGE mode.", null);
1298 }
1299
1300 if (topic instanceof CustomDestination) {
1301 CustomDestination customDestination = (CustomDestination)topic;
1302 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1303 }
1304
1305 connection.checkClientIDWasManuallySpecified();
1306 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1307 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1308 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1309 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1310 noLocal, false, asyncDispatch);
1311 }
1312
1313 /**
1314 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1315 * the specified queue.
1316 *
1317 * @param queue the <CODE>queue</CODE> to access
1318 * @return the Queue Browser
1319 * @throws JMSException if the session fails to create a browser due to some
1320 * internal error.
1321 * @throws InvalidDestinationException if an invalid destination is
1322 * specified
1323 * @since 1.1
1324 */
1325 public QueueBrowser createBrowser(Queue queue) throws JMSException {
1326 checkClosed();
1327 return createBrowser(queue, null);
1328 }
1329
1330 /**
1331 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1332 * the specified queue using a message selector.
1333 *
1334 * @param queue the <CODE>queue</CODE> to access
1335 * @param messageSelector only messages with properties matching the message
1336 * selector expression are delivered. A value of null or an
1337 * empty string indicates that there is no message selector
1338 * for the message consumer.
1339 * @return the Queue Browser
1340 * @throws JMSException if the session fails to create a browser due to some
1341 * internal error.
1342 * @throws InvalidDestinationException if an invalid destination is
1343 * specified
1344 * @throws InvalidSelectorException if the message selector is invalid.
1345 * @since 1.1
1346 */
1347 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1348 checkClosed();
1349 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1350 }
1351
1352 /**
1353 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1354 * of the <CODE>Connection</CODE> unless it is deleted earlier.
1355 *
1356 * @return a temporary queue identity
1357 * @throws JMSException if the session fails to create a temporary queue due
1358 * to some internal error.
1359 * @since 1.1
1360 */
1361 public TemporaryQueue createTemporaryQueue() throws JMSException {
1362 checkClosed();
1363 return (TemporaryQueue)connection.createTempDestination(false);
1364 }
1365
1366 /**
1367 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1368 * of the <CODE>Connection</CODE> unless it is deleted earlier.
1369 *
1370 * @return a temporary topic identity
1371 * @throws JMSException if the session fails to create a temporary topic due
1372 * to some internal error.
1373 * @since 1.1
1374 */
1375 public TemporaryTopic createTemporaryTopic() throws JMSException {
1376 checkClosed();
1377 return (TemporaryTopic)connection.createTempDestination(true);
1378 }
1379
1380 /**
1381 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1382 * the specified queue.
1383 *
1384 * @param queue the <CODE>Queue</CODE> to access
1385 * @return
1386 * @throws JMSException if the session fails to create a receiver due to
1387 * some internal error.
1388 * @throws JMSException
1389 * @throws InvalidDestinationException if an invalid queue is specified.
1390 */
1391 public QueueReceiver createReceiver(Queue queue) throws JMSException {
1392 checkClosed();
1393 return createReceiver(queue, null);
1394 }
1395
1396 /**
1397 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1398 * the specified queue using a message selector.
1399 *
1400 * @param queue the <CODE>Queue</CODE> to access
1401 * @param messageSelector only messages with properties matching the message
1402 * selector expression are delivered. A value of null or an
1403 * empty string indicates that there is no message selector
1404 * for the message consumer.
1405 * @return QueueReceiver
1406 * @throws JMSException if the session fails to create a receiver due to
1407 * some internal error.
1408 * @throws InvalidDestinationException if an invalid queue is specified.
1409 * @throws InvalidSelectorException if the message selector is invalid.
1410 */
1411 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1412 checkClosed();
1413
1414 if (queue instanceof CustomDestination) {
1415 CustomDestination customDestination = (CustomDestination)queue;
1416 return customDestination.createReceiver(this, messageSelector);
1417 }
1418
1419 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1420 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1421 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1422 }
1423
1424 /**
1425 * Creates a <CODE>QueueSender</CODE> object to send messages to the
1426 * specified queue.
1427 *
1428 * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1429 * unidentified producer
1430 * @return QueueSender
1431 * @throws JMSException if the session fails to create a sender due to some
1432 * internal error.
1433 * @throws InvalidDestinationException if an invalid queue is specified.
1434 */
1435 public QueueSender createSender(Queue queue) throws JMSException {
1436 checkClosed();
1437 if (queue instanceof CustomDestination) {
1438 CustomDestination customDestination = (CustomDestination)queue;
1439 return customDestination.createSender(this);
1440 }
1441 int timeSendOut = connection.getSendTimeout();
1442 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1443 }
1444
1445 /**
1446 * Creates a nondurable subscriber to the specified topic. <p/>
1447 * <P>
1448 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1449 * that have been published to a topic. <p/>
1450 * <P>
1451 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1452 * receive only messages that are published while they are active. <p/>
1453 * <P>
1454 * In some cases, a connection may both publish and subscribe to a topic.
1455 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1456 * inhibit the delivery of messages published by its own connection. The
1457 * default value for this attribute is false.
1458 *
1459 * @param topic the <CODE>Topic</CODE> to subscribe to
1460 * @return TopicSubscriber
1461 * @throws JMSException if the session fails to create a subscriber due to
1462 * some internal error.
1463 * @throws InvalidDestinationException if an invalid topic is specified.
1464 */
1465 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1466 checkClosed();
1467 return createSubscriber(topic, null, false);
1468 }
1469
1470 /**
1471 * Creates a nondurable subscriber to the specified topic, using a message
1472 * selector or specifying whether messages published by its own connection
1473 * should be delivered to it. <p/>
1474 * <P>
1475 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1476 * that have been published to a topic. <p/>
1477 * <P>
1478 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1479 * receive only messages that are published while they are active. <p/>
1480 * <P>
1481 * Messages filtered out by a subscriber's message selector will never be
1482 * delivered to the subscriber. From the subscriber's perspective, they do
1483 * not exist. <p/>
1484 * <P>
1485 * In some cases, a connection may both publish and subscribe to a topic.
1486 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1487 * inhibit the delivery of messages published by its own connection. The
1488 * default value for this attribute is false.
1489 *
1490 * @param topic the <CODE>Topic</CODE> to subscribe to
1491 * @param messageSelector only messages with properties matching the message
1492 * selector expression are delivered. A value of null or an
1493 * empty string indicates that there is no message selector
1494 * for the message consumer.
1495 * @param noLocal if set, inhibits the delivery of messages published by its
1496 * own connection
1497 * @return TopicSubscriber
1498 * @throws JMSException if the session fails to create a subscriber due to
1499 * some internal error.
1500 * @throws InvalidDestinationException if an invalid topic is specified.
1501 * @throws InvalidSelectorException if the message selector is invalid.
1502 */
1503 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1504 checkClosed();
1505
1506 if (topic instanceof CustomDestination) {
1507 CustomDestination customDestination = (CustomDestination)topic;
1508 return customDestination.createSubscriber(this, messageSelector, noLocal);
1509 }
1510
1511 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1512 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1513 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1514 }
1515
1516 /**
1517 * Creates a publisher for the specified topic. <p/>
1518 * <P>
1519 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1520 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1521 * a topic, it defines a new sequence of messages that have no ordering
1522 * relationship with the messages it has previously sent.
1523 *
1524 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1525 * an unidentified producer
1526 * @return TopicPublisher
1527 * @throws JMSException if the session fails to create a publisher due to
1528 * some internal error.
1529 * @throws InvalidDestinationException if an invalid topic is specified.
1530 */
1531 public TopicPublisher createPublisher(Topic topic) throws JMSException {
1532 checkClosed();
1533
1534 if (topic instanceof CustomDestination) {
1535 CustomDestination customDestination = (CustomDestination)topic;
1536 return customDestination.createPublisher(this);
1537 }
1538 int timeSendOut = connection.getSendTimeout();
1539 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1540 }
1541
1542 /**
1543 * Unsubscribes a durable subscription that has been created by a client.
1544 * <P>
1545 * This method deletes the state being maintained on behalf of the
1546 * subscriber by its provider.
1547 * <P>
1548 * It is erroneous for a client to delete a durable subscription while there
1549 * is an active <CODE>MessageConsumer </CODE> or
1550 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1551 * message is part of a pending transaction or has not been acknowledged in
1552 * the session.
1553 *
1554 * @param name the name used to identify this subscription
1555 * @throws JMSException if the session fails to unsubscribe to the durable
1556 * subscription due to some internal error.
1557 * @throws InvalidDestinationException if an invalid subscription name is
1558 * specified.
1559 * @since 1.1
1560 */
1561 public void unsubscribe(String name) throws JMSException {
1562 checkClosed();
1563 connection.unsubscribe(name);
1564 }
1565
1566 public void dispatch(MessageDispatch messageDispatch) {
1567 try {
1568 executor.execute(messageDispatch);
1569 } catch (InterruptedException e) {
1570 Thread.currentThread().interrupt();
1571 connection.onClientInternalException(e);
1572 }
1573 }
1574
1575 /**
1576 * Acknowledges all consumed messages of the session of this consumed
1577 * message.
1578 * <P>
1579 * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1580 * for use when a client has specified that its JMS session's consumed
1581 * messages are to be explicitly acknowledged. By invoking
1582 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1583 * all messages consumed by the session that the message was delivered to.
1584 * <P>
1585 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1586 * sessions and sessions specified to use implicit acknowledgement modes.
1587 * <P>
1588 * A client may individually acknowledge each message as it is consumed, or
1589 * it may choose to acknowledge messages as an application-defined group
1590 * (which is done by calling acknowledge on the last received message of the
1591 * group, thereby acknowledging all messages consumed by the session.)
1592 * <P>
1593 * Messages that have been received but not acknowledged may be redelivered.
1594 *
1595 * @throws JMSException if the JMS provider fails to acknowledge the
1596 * messages due to some internal error.
1597 * @throws javax.jms.IllegalStateException if this method is called on a
1598 * closed session.
1599 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1600 */
1601 public void acknowledge() throws JMSException {
1602 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1603 ActiveMQMessageConsumer c = iter.next();
1604 c.acknowledge();
1605 }
1606 }
1607
1608 /**
1609 * Add a message consumer.
1610 *
1611 * @param consumer - message consumer.
1612 * @throws JMSException
1613 */
1614 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1615 this.consumers.add(consumer);
1616 if (consumer.isDurableSubscriber()) {
1617 stats.onCreateDurableSubscriber();
1618 }
1619 this.connection.addDispatcher(consumer.getConsumerId(), this);
1620 }
1621
1622 /**
1623 * Remove the message consumer.
1624 *
1625 * @param consumer - consumer to be removed.
1626 * @throws JMSException
1627 */
1628 protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1629 this.connection.removeDispatcher(consumer.getConsumerId());
1630 if (consumer.isDurableSubscriber()) {
1631 stats.onRemoveDurableSubscriber();
1632 }
1633 this.consumers.remove(consumer);
1634 this.connection.removeDispatcher(consumer);
1635 }
1636
1637 /**
1638 * Adds a message producer.
1639 *
1640 * @param producer - message producer to be added.
1641 * @throws JMSException
1642 */
1643 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1644 this.producers.add(producer);
1645 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1646 }
1647
1648 /**
1649 * Removes a message producer.
1650 *
1651 * @param producer - message producer to be removed.
1652 * @throws JMSException
1653 */
1654 protected void removeProducer(ActiveMQMessageProducer producer) {
1655 this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1656 this.producers.remove(producer);
1657 }
1658
1659 /**
1660 * Start this Session.
1661 *
1662 * @throws JMSException
1663 */
1664 protected void start() throws JMSException {
1665 started.set(true);
1666 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1667 ActiveMQMessageConsumer c = iter.next();
1668 c.start();
1669 }
1670 executor.start();
1671 }
1672
1673 /**
1674 * Stops this session.
1675 *
1676 * @throws JMSException
1677 */
1678 protected void stop() throws JMSException {
1679
1680 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1681 ActiveMQMessageConsumer c = iter.next();
1682 c.stop();
1683 }
1684
1685 started.set(false);
1686 executor.stop();
1687 }
1688
1689 /**
1690 * Returns the session id.
1691 *
1692 * @return value - session id.
1693 */
1694 protected SessionId getSessionId() {
1695 return info.getSessionId();
1696 }
1697
1698 /**
1699 * @return
1700 */
1701 protected ConsumerId getNextConsumerId() {
1702 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1703 }
1704
1705 /**
1706 * @return
1707 */
1708 protected ProducerId getNextProducerId() {
1709 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1710 }
1711
1712 /**
1713 * Sends the message for dispatch by the broker.
1714 *
1715 *
1716 * @param producer - message producer.
1717 * @param destination - message destination.
1718 * @param message - message to be sent.
1719 * @param deliveryMode - JMS messsage delivery mode.
1720 * @param priority - message priority.
1721 * @param timeToLive - message expiration.
1722 * @param producerWindow
1723 * @param onComplete
1724 * @throws JMSException
1725 */
1726 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1727 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1728
1729 checkClosed();
1730 if (destination.isTemporary() && connection.isDeleted(destination)) {
1731 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1732 }
1733 synchronized (sendMutex) {
1734 // tell the Broker we are about to start a new transaction
1735 doStartTransaction();
1736 TransactionId txid = transactionContext.getTransactionId();
1737 long sequenceNumber = producer.getMessageSequence();
1738
1739 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1740 message.setJMSDeliveryMode(deliveryMode);
1741 long expiration = 0L;
1742 if (!producer.getDisableMessageTimestamp()) {
1743 long timeStamp = System.currentTimeMillis();
1744 message.setJMSTimestamp(timeStamp);
1745 if (timeToLive > 0) {
1746 expiration = timeToLive + timeStamp;
1747 }
1748 }
1749 message.setJMSExpiration(expiration);
1750 message.setJMSPriority(priority);
1751 message.setJMSRedelivered(false);
1752
1753 // transform to our own message format here
1754 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1755
1756 // Set the message id.
1757 if (msg == message) {
1758 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1759 } else {
1760 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1761 message.setJMSMessageID(msg.getMessageId().toString());
1762 }
1763 //clear the brokerPath in case we are re-sending this message
1764 msg.setBrokerPath(null);
1765 // destination format is provider specific so only set on transformed message
1766 msg.setJMSDestination(destination);
1767
1768 msg.setTransactionId(txid);
1769 if (connection.isCopyMessageOnSend()) {
1770 msg = (ActiveMQMessage)msg.copy();
1771 }
1772 msg.setConnection(connection);
1773 msg.onSend();
1774 msg.setProducerId(msg.getMessageId().getProducerId());
1775 if (LOG.isTraceEnabled()) {
1776 LOG.trace(getSessionId() + " sending message: " + msg);
1777 }
1778 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1779 this.connection.asyncSendPacket(msg);
1780 if (producerWindow != null) {
1781 // Since we defer lots of the marshaling till we hit the
1782 // wire, this might not
1783 // provide and accurate size. We may change over to doing
1784 // more aggressive marshaling,
1785 // to get more accurate sizes.. this is more important once
1786 // users start using producer window
1787 // flow control.
1788 int size = msg.getSize();
1789 producerWindow.increaseUsage(size);
1790 }
1791 } else {
1792 if (sendTimeout > 0 && onComplete==null) {
1793 this.connection.syncSendPacket(msg,sendTimeout);
1794 }else {
1795 this.connection.syncSendPacket(msg, onComplete);
1796 }
1797 }
1798
1799 }
1800 }
1801
1802 /**
1803 * Send TransactionInfo to indicate transaction has started
1804 *
1805 * @throws JMSException if some internal error occurs
1806 */
1807 protected void doStartTransaction() throws JMSException {
1808 if (getTransacted() && !transactionContext.isInXATransaction()) {
1809 transactionContext.begin();
1810 }
1811 }
1812
1813 /**
1814 * Checks whether the session has unconsumed messages.
1815 *
1816 * @return true - if there are unconsumed messages.
1817 */
1818 public boolean hasUncomsumedMessages() {
1819 return executor.hasUncomsumedMessages();
1820 }
1821
1822 /**
1823 * Checks whether the session uses transactions.
1824 *
1825 * @return true - if the session uses transactions.
1826 */
1827 public boolean isTransacted() {
1828 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1829 }
1830
1831 /**
1832 * Checks whether the session used client acknowledgment.
1833 *
1834 * @return true - if the session uses client acknowledgment.
1835 */
1836 protected boolean isClientAcknowledge() {
1837 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1838 }
1839
1840 /**
1841 * Checks whether the session used auto acknowledgment.
1842 *
1843 * @return true - if the session uses client acknowledgment.
1844 */
1845 public boolean isAutoAcknowledge() {
1846 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1847 }
1848
1849 /**
1850 * Checks whether the session used dup ok acknowledgment.
1851 *
1852 * @return true - if the session uses client acknowledgment.
1853 */
1854 public boolean isDupsOkAcknowledge() {
1855 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1856 }
1857
1858 public boolean isIndividualAcknowledge(){
1859 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1860 }
1861
1862 /**
1863 * Returns the message delivery listener.
1864 *
1865 * @return deliveryListener - message delivery listener.
1866 */
1867 public DeliveryListener getDeliveryListener() {
1868 return deliveryListener;
1869 }
1870
1871 /**
1872 * Sets the message delivery listener.
1873 *
1874 * @param deliveryListener - message delivery listener.
1875 */
1876 public void setDeliveryListener(DeliveryListener deliveryListener) {
1877 this.deliveryListener = deliveryListener;
1878 }
1879
1880 /**
1881 * Returns the SessionInfo bean.
1882 *
1883 * @return info - SessionInfo bean.
1884 * @throws JMSException
1885 */
1886 protected SessionInfo getSessionInfo() throws JMSException {
1887 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1888 return info;
1889 }
1890
1891 /**
1892 * Send the asynchronus command.
1893 *
1894 * @param command - command to be executed.
1895 * @throws JMSException
1896 */
1897 public void asyncSendPacket(Command command) throws JMSException {
1898 connection.asyncSendPacket(command);
1899 }
1900
1901 /**
1902 * Send the synchronus command.
1903 *
1904 * @param command - command to be executed.
1905 * @return Response
1906 * @throws JMSException
1907 */
1908 public Response syncSendPacket(Command command) throws JMSException {
1909 return connection.syncSendPacket(command);
1910 }
1911
1912 public long getNextDeliveryId() {
1913 return deliveryIdGenerator.getNextSequenceId();
1914 }
1915
1916 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1917
1918 List<MessageDispatch> c = unconsumedMessages.removeAll();
1919 for (MessageDispatch md : c) {
1920 this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1921 }
1922 Collections.reverse(c);
1923
1924 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1925 MessageDispatch md = iter.next();
1926 executor.executeFirst(md);
1927 }
1928
1929 }
1930
1931 public boolean isRunning() {
1932 return started.get();
1933 }
1934
1935 public boolean isAsyncDispatch() {
1936 return asyncDispatch;
1937 }
1938
1939 public void setAsyncDispatch(boolean asyncDispatch) {
1940 this.asyncDispatch = asyncDispatch;
1941 }
1942
1943 /**
1944 * @return Returns the sessionAsyncDispatch.
1945 */
1946 public boolean isSessionAsyncDispatch() {
1947 return sessionAsyncDispatch;
1948 }
1949
1950 /**
1951 * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1952 */
1953 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1954 this.sessionAsyncDispatch = sessionAsyncDispatch;
1955 }
1956
1957 public MessageTransformer getTransformer() {
1958 return transformer;
1959 }
1960
1961 public ActiveMQConnection getConnection() {
1962 return connection;
1963 }
1964
1965 /**
1966 * Sets the transformer used to transform messages before they are sent on
1967 * to the JMS bus or when they are received from the bus but before they are
1968 * delivered to the JMS client
1969 */
1970 public void setTransformer(MessageTransformer transformer) {
1971 this.transformer = transformer;
1972 }
1973
1974 public BlobTransferPolicy getBlobTransferPolicy() {
1975 return blobTransferPolicy;
1976 }
1977
1978 /**
1979 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1980 * OBjects) are transferred from producers to brokers to consumers
1981 */
1982 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1983 this.blobTransferPolicy = blobTransferPolicy;
1984 }
1985
1986 public List<MessageDispatch> getUnconsumedMessages() {
1987 return executor.getUnconsumedMessages();
1988 }
1989
1990 @Override
1991 public String toString() {
1992 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1993 }
1994
1995 public void checkMessageListener() throws JMSException {
1996 if (messageListener != null) {
1997 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1998 }
1999 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
2000 ActiveMQMessageConsumer consumer = i.next();
2001 if (consumer.getMessageListener() != null) {
2002 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2003 }
2004 }
2005 }
2006
2007 protected void setOptimizeAcknowledge(boolean value) {
2008 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2009 ActiveMQMessageConsumer c = iter.next();
2010 c.setOptimizeAcknowledge(value);
2011 }
2012 }
2013
2014 protected void setPrefetchSize(ConsumerId id, int prefetch) {
2015 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2016 ActiveMQMessageConsumer c = iter.next();
2017 if (c.getConsumerId().equals(id)) {
2018 c.setPrefetchSize(prefetch);
2019 break;
2020 }
2021 }
2022 }
2023
2024 protected void close(ConsumerId id) {
2025 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2026 ActiveMQMessageConsumer c = iter.next();
2027 if (c.getConsumerId().equals(id)) {
2028 try {
2029 c.close();
2030 } catch (JMSException e) {
2031 LOG.warn("Exception closing consumer", e);
2032 }
2033 LOG.warn("Closed consumer on Command");
2034 break;
2035 }
2036 }
2037 }
2038
2039 public boolean isInUse(ActiveMQTempDestination destination) {
2040 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2041 ActiveMQMessageConsumer c = iter.next();
2042 if (c.isInUse(destination)) {
2043 return true;
2044 }
2045 }
2046 return false;
2047 }
2048
2049 /**
2050 * highest sequence id of the last message delivered by this session.
2051 * Passed to the broker in the close command, maintained by dispose()
2052 * @return lastDeliveredSequenceId
2053 */
2054 public long getLastDeliveredSequenceId() {
2055 return lastDeliveredSequenceId;
2056 }
2057
2058 protected void sendAck(MessageAck ack) throws JMSException {
2059 sendAck(ack,false);
2060 }
2061
2062 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2063 if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2064 asyncSendPacket(ack);
2065 } else {
2066 syncSendPacket(ack);
2067 }
2068 }
2069
2070 protected Scheduler getScheduler() throws JMSException {
2071 return this.connection.getScheduler();
2072 }
2073
2074 protected ThreadPoolExecutor getConnectionExecutor() {
2075 return this.connectionExecutor;
2076 }
2077 }