001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.File;
020 import java.io.InputStream;
021 import java.io.Serializable;
022 import java.net.URL;
023 import java.util.Collections;
024 import java.util.Iterator;
025 import java.util.List;
026 import java.util.concurrent.CopyOnWriteArrayList;
027 import java.util.concurrent.ThreadPoolExecutor;
028 import java.util.concurrent.atomic.AtomicBoolean;
029
030 import javax.jms.BytesMessage;
031 import javax.jms.Destination;
032 import javax.jms.IllegalStateException;
033 import javax.jms.InvalidDestinationException;
034 import javax.jms.InvalidSelectorException;
035 import javax.jms.JMSException;
036 import javax.jms.MapMessage;
037 import javax.jms.Message;
038 import javax.jms.MessageConsumer;
039 import javax.jms.MessageListener;
040 import javax.jms.MessageProducer;
041 import javax.jms.ObjectMessage;
042 import javax.jms.Queue;
043 import javax.jms.QueueBrowser;
044 import javax.jms.QueueReceiver;
045 import javax.jms.QueueSender;
046 import javax.jms.QueueSession;
047 import javax.jms.Session;
048 import javax.jms.StreamMessage;
049 import javax.jms.TemporaryQueue;
050 import javax.jms.TemporaryTopic;
051 import javax.jms.TextMessage;
052 import javax.jms.Topic;
053 import javax.jms.TopicPublisher;
054 import javax.jms.TopicSession;
055 import javax.jms.TopicSubscriber;
056 import javax.jms.TransactionRolledBackException;
057
058 import org.apache.activemq.blob.BlobDownloader;
059 import org.apache.activemq.blob.BlobTransferPolicy;
060 import org.apache.activemq.blob.BlobUploader;
061 import org.apache.activemq.command.ActiveMQBlobMessage;
062 import org.apache.activemq.command.ActiveMQBytesMessage;
063 import org.apache.activemq.command.ActiveMQDestination;
064 import org.apache.activemq.command.ActiveMQMapMessage;
065 import org.apache.activemq.command.ActiveMQMessage;
066 import org.apache.activemq.command.ActiveMQObjectMessage;
067 import org.apache.activemq.command.ActiveMQQueue;
068 import org.apache.activemq.command.ActiveMQStreamMessage;
069 import org.apache.activemq.command.ActiveMQTempDestination;
070 import org.apache.activemq.command.ActiveMQTempQueue;
071 import org.apache.activemq.command.ActiveMQTempTopic;
072 import org.apache.activemq.command.ActiveMQTextMessage;
073 import org.apache.activemq.command.ActiveMQTopic;
074 import org.apache.activemq.command.Command;
075 import org.apache.activemq.command.ConsumerId;
076 import org.apache.activemq.command.MessageAck;
077 import org.apache.activemq.command.MessageDispatch;
078 import org.apache.activemq.command.MessageId;
079 import org.apache.activemq.command.ProducerId;
080 import org.apache.activemq.command.RemoveInfo;
081 import org.apache.activemq.command.Response;
082 import org.apache.activemq.command.SessionId;
083 import org.apache.activemq.command.SessionInfo;
084 import org.apache.activemq.command.TransactionId;
085 import org.apache.activemq.management.JMSSessionStatsImpl;
086 import org.apache.activemq.management.StatsCapable;
087 import org.apache.activemq.management.StatsImpl;
088 import org.apache.activemq.thread.Scheduler;
089 import org.apache.activemq.transaction.Synchronization;
090 import org.apache.activemq.usage.MemoryUsage;
091 import org.apache.activemq.util.Callback;
092 import org.apache.activemq.util.JMSExceptionSupport;
093 import org.apache.activemq.util.LongSequenceGenerator;
094 import org.slf4j.Logger;
095 import org.slf4j.LoggerFactory;
096
097 /**
098 * <P>
099 * A <CODE>Session</CODE> object is a single-threaded context for producing
100 * and consuming messages. Although it may allocate provider resources outside
101 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
102 * <P>
103 * A session serves several purposes:
104 * <UL>
105 * <LI>It is a factory for its message producers and consumers.
106 * <LI>It supplies provider-optimized message factories.
107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
108 * <CODE>TemporaryQueues</CODE>.
109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
110 * objects for those clients that need to dynamically manipulate
111 * provider-specific destination names.
112 * <LI>It supports a single series of transactions that combine work spanning
113 * its producers and consumers into atomic units.
114 * <LI>It defines a serial order for the messages it consumes and the messages
115 * it produces.
116 * <LI>It retains messages it consumes until they have been acknowledged.
117 * <LI>It serializes execution of message listeners registered with its message
118 * consumers.
119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
120 * </UL>
121 * <P>
122 * A session can create and service multiple message producers and consumers.
123 * <P>
124 * One typical use is to have a thread block on a synchronous
125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
127 * <P>
128 * If a client desires to have one thread produce messages while others consume
129 * them, the client should use a separate session for its producing thread.
130 * <P>
131 * Once a connection has been started, any session with one or more registered
132 * message listeners is dedicated to the thread of control that delivers
133 * messages to it. It is erroneous for client code to use this session or any of
134 * its constituent objects from another thread of control. The only exception to
135 * this rule is the use of the session or connection <CODE>close</CODE>
136 * method.
137 * <P>
138 * It should be easy for most clients to partition their work naturally into
139 * sessions. This model allows clients to start simply and incrementally add
140 * message processing complexity as their need for concurrency grows.
141 * <P>
142 * The <CODE>close</CODE> method is the only session method that can be called
143 * while some other session method is being executed in another thread.
144 * <P>
145 * A session may be specified as transacted. Each transacted session supports a
146 * single series of transactions. Each transaction groups a set of message sends
147 * and a set of message receives into an atomic unit of work. In effect,
148 * transactions organize a session's input message stream and output message
149 * stream into series of atomic units. When a transaction commits, its atomic
150 * unit of input is acknowledged and its associated atomic unit of output is
151 * sent. If a transaction rollback is done, the transaction's sent messages are
152 * destroyed and the session's input is automatically recovered.
153 * <P>
154 * The content of a transaction's input and output units is simply those
155 * messages that have been produced and consumed within the session's current
156 * transaction.
157 * <P>
158 * A transaction is completed using either its session's <CODE>commit</CODE>
159 * method or its session's <CODE>rollback </CODE> method. The completion of a
160 * session's current transaction automatically begins the next. The result is
161 * that a transacted session always has a current transaction within which its
162 * work is done.
163 * <P>
164 * The Java Transaction Service (JTS) or some other transaction monitor may be
165 * used to combine a session's transaction with transactions on other resources
166 * (databases, other JMS sessions, etc.). Since Java distributed transactions
167 * are controlled via the Java Transaction API (JTA), use of the session's
168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
169 * prohibited.
170 * <P>
171 * The JMS API does not require support for JTA; however, it does define how a
172 * provider supplies this support.
173 * <P>
174 * Although it is also possible for a JMS client to handle distributed
175 * transactions directly, it is unlikely that many JMS clients will do this.
176 * Support for JTA in the JMS API is targeted at systems vendors who will be
177 * integrating the JMS API into their application server products.
178 *
179 *
180 * @see javax.jms.Session
181 * @see javax.jms.QueueSession
182 * @see javax.jms.TopicSession
183 * @see javax.jms.XASession
184 */
185 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
186
187 /**
188 * Only acknowledge an individual message - using message.acknowledge()
189 * as opposed to CLIENT_ACKNOWLEDGE which
190 * acknowledges all messages consumed by a session at when acknowledge()
191 * is called
192 */
193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
195
196 public static interface DeliveryListener {
197 void beforeDelivery(ActiveMQSession session, Message msg);
198
199 void afterDelivery(ActiveMQSession session, Message msg);
200 }
201
202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
203 private final ThreadPoolExecutor connectionExecutor;
204
205 protected int acknowledgementMode;
206 protected final ActiveMQConnection connection;
207 protected final SessionInfo info;
208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
211 protected final ActiveMQSessionExecutor executor;
212 protected final AtomicBoolean started = new AtomicBoolean(false);
213
214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
216
217 protected boolean closed;
218 private volatile boolean synchronizationRegistered;
219 protected boolean asyncDispatch;
220 protected boolean sessionAsyncDispatch;
221 protected final boolean debug;
222 protected Object sendMutex = new Object();
223
224 private MessageListener messageListener;
225 private final JMSSessionStatsImpl stats;
226 private TransactionContext transactionContext;
227 private DeliveryListener deliveryListener;
228 private MessageTransformer transformer;
229 private BlobTransferPolicy blobTransferPolicy;
230 private long lastDeliveredSequenceId;
231
232 /**
233 * Construct the Session
234 *
235 * @param connection
236 * @param sessionId
237 * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
238 * Session.SESSION_TRANSACTED
239 * @param asyncDispatch
240 * @param sessionAsyncDispatch
241 * @throws JMSException on internal error
242 */
243 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
244 this.debug = LOG.isDebugEnabled();
245 this.connection = connection;
246 this.acknowledgementMode = acknowledgeMode;
247 this.asyncDispatch = asyncDispatch;
248 this.sessionAsyncDispatch = sessionAsyncDispatch;
249 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
250 setTransactionContext(new TransactionContext(connection));
251 stats = new JMSSessionStatsImpl(producers, consumers);
252 this.connection.asyncSendPacket(info);
253 setTransformer(connection.getTransformer());
254 setBlobTransferPolicy(connection.getBlobTransferPolicy());
255 this.connectionExecutor=connection.getExecutor();
256 this.executor = new ActiveMQSessionExecutor(this);
257 connection.addSession(this);
258 if (connection.isStarted()) {
259 start();
260 }
261
262 }
263
264 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
265 this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
266 }
267
268 /**
269 * Sets the transaction context of the session.
270 *
271 * @param transactionContext - provides the means to control a JMS
272 * transaction.
273 */
274 public void setTransactionContext(TransactionContext transactionContext) {
275 this.transactionContext = transactionContext;
276 }
277
278 /**
279 * Returns the transaction context of the session.
280 *
281 * @return transactionContext - session's transaction context.
282 */
283 public TransactionContext getTransactionContext() {
284 return transactionContext;
285 }
286
287 /*
288 * (non-Javadoc)
289 *
290 * @see org.apache.activemq.management.StatsCapable#getStats()
291 */
292 public StatsImpl getStats() {
293 return stats;
294 }
295
296 /**
297 * Returns the session's statistics.
298 *
299 * @return stats - session's statistics.
300 */
301 public JMSSessionStatsImpl getSessionStats() {
302 return stats;
303 }
304
305 /**
306 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
307 * object is used to send a message containing a stream of uninterpreted
308 * bytes.
309 *
310 * @return the an ActiveMQBytesMessage
311 * @throws JMSException if the JMS provider fails to create this message due
312 * to some internal error.
313 */
314 public BytesMessage createBytesMessage() throws JMSException {
315 ActiveMQBytesMessage message = new ActiveMQBytesMessage();
316 configureMessage(message);
317 return message;
318 }
319
320 /**
321 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
322 * object is used to send a self-defining set of name-value pairs, where
323 * names are <CODE>String</CODE> objects and values are primitive values
324 * in the Java programming language.
325 *
326 * @return an ActiveMQMapMessage
327 * @throws JMSException if the JMS provider fails to create this message due
328 * to some internal error.
329 */
330 public MapMessage createMapMessage() throws JMSException {
331 ActiveMQMapMessage message = new ActiveMQMapMessage();
332 configureMessage(message);
333 return message;
334 }
335
336 /**
337 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
338 * interface is the root interface of all JMS messages. A
339 * <CODE>Message</CODE> object holds all the standard message header
340 * information. It can be sent when a message containing only header
341 * information is sufficient.
342 *
343 * @return an ActiveMQMessage
344 * @throws JMSException if the JMS provider fails to create this message due
345 * to some internal error.
346 */
347 public Message createMessage() throws JMSException {
348 ActiveMQMessage message = new ActiveMQMessage();
349 configureMessage(message);
350 return message;
351 }
352
353 /**
354 * Creates an <CODE>ObjectMessage</CODE> object. An
355 * <CODE>ObjectMessage</CODE> object is used to send a message that
356 * contains a serializable Java object.
357 *
358 * @return an ActiveMQObjectMessage
359 * @throws JMSException if the JMS provider fails to create this message due
360 * to some internal error.
361 */
362 public ObjectMessage createObjectMessage() throws JMSException {
363 ActiveMQObjectMessage message = new ActiveMQObjectMessage();
364 configureMessage(message);
365 return message;
366 }
367
368 /**
369 * Creates an initialized <CODE>ObjectMessage</CODE> object. An
370 * <CODE>ObjectMessage</CODE> object is used to send a message that
371 * contains a serializable Java object.
372 *
373 * @param object the object to use to initialize this message
374 * @return an ActiveMQObjectMessage
375 * @throws JMSException if the JMS provider fails to create this message due
376 * to some internal error.
377 */
378 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
379 ActiveMQObjectMessage message = new ActiveMQObjectMessage();
380 configureMessage(message);
381 message.setObject(object);
382 return message;
383 }
384
385 /**
386 * Creates a <CODE>StreamMessage</CODE> object. A
387 * <CODE>StreamMessage</CODE> object is used to send a self-defining
388 * stream of primitive values in the Java programming language.
389 *
390 * @return an ActiveMQStreamMessage
391 * @throws JMSException if the JMS provider fails to create this message due
392 * to some internal error.
393 */
394 public StreamMessage createStreamMessage() throws JMSException {
395 ActiveMQStreamMessage message = new ActiveMQStreamMessage();
396 configureMessage(message);
397 return message;
398 }
399
400 /**
401 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
402 * object is used to send a message containing a <CODE>String</CODE>
403 * object.
404 *
405 * @return an ActiveMQTextMessage
406 * @throws JMSException if the JMS provider fails to create this message due
407 * to some internal error.
408 */
409 public TextMessage createTextMessage() throws JMSException {
410 ActiveMQTextMessage message = new ActiveMQTextMessage();
411 configureMessage(message);
412 return message;
413 }
414
415 /**
416 * Creates an initialized <CODE>TextMessage</CODE> object. A
417 * <CODE>TextMessage</CODE> object is used to send a message containing a
418 * <CODE>String</CODE>.
419 *
420 * @param text the string used to initialize this message
421 * @return an ActiveMQTextMessage
422 * @throws JMSException if the JMS provider fails to create this message due
423 * to some internal error.
424 */
425 public TextMessage createTextMessage(String text) throws JMSException {
426 ActiveMQTextMessage message = new ActiveMQTextMessage();
427 message.setText(text);
428 configureMessage(message);
429 return message;
430 }
431
432 /**
433 * Creates an initialized <CODE>BlobMessage</CODE> object. A
434 * <CODE>BlobMessage</CODE> object is used to send a message containing a
435 * <CODE>URL</CODE> which points to some network addressible BLOB.
436 *
437 * @param url the network addressable URL used to pass directly to the
438 * consumer
439 * @return a BlobMessage
440 * @throws JMSException if the JMS provider fails to create this message due
441 * to some internal error.
442 */
443 public BlobMessage createBlobMessage(URL url) throws JMSException {
444 return createBlobMessage(url, false);
445 }
446
447 /**
448 * Creates an initialized <CODE>BlobMessage</CODE> object. A
449 * <CODE>BlobMessage</CODE> object is used to send a message containing a
450 * <CODE>URL</CODE> which points to some network addressible BLOB.
451 *
452 * @param url the network addressable URL used to pass directly to the
453 * consumer
454 * @param deletedByBroker indicates whether or not the resource is deleted
455 * by the broker when the message is acknowledged
456 * @return a BlobMessage
457 * @throws JMSException if the JMS provider fails to create this message due
458 * to some internal error.
459 */
460 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
461 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
462 configureMessage(message);
463 message.setURL(url);
464 message.setDeletedByBroker(deletedByBroker);
465 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
466 return message;
467 }
468
469 /**
470 * Creates an initialized <CODE>BlobMessage</CODE> object. A
471 * <CODE>BlobMessage</CODE> object is used to send a message containing
472 * the <CODE>File</CODE> content. Before the message is sent the file
473 * conent will be uploaded to the broker or some other remote repository
474 * depending on the {@link #getBlobTransferPolicy()}.
475 *
476 * @param file the file to be uploaded to some remote repo (or the broker)
477 * depending on the strategy
478 * @return a BlobMessage
479 * @throws JMSException if the JMS provider fails to create this message due
480 * to some internal error.
481 */
482 public BlobMessage createBlobMessage(File file) throws JMSException {
483 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
484 configureMessage(message);
485 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
486 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
487 message.setDeletedByBroker(true);
488 message.setName(file.getName());
489 return message;
490 }
491
492 /**
493 * Creates an initialized <CODE>BlobMessage</CODE> object. A
494 * <CODE>BlobMessage</CODE> object is used to send a message containing
495 * the <CODE>File</CODE> content. Before the message is sent the file
496 * conent will be uploaded to the broker or some other remote repository
497 * depending on the {@link #getBlobTransferPolicy()}.
498 *
499 * @param in the stream to be uploaded to some remote repo (or the broker)
500 * depending on the strategy
501 * @return a BlobMessage
502 * @throws JMSException if the JMS provider fails to create this message due
503 * to some internal error.
504 */
505 public BlobMessage createBlobMessage(InputStream in) throws JMSException {
506 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
507 configureMessage(message);
508 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
509 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
510 message.setDeletedByBroker(true);
511 return message;
512 }
513
514 /**
515 * Indicates whether the session is in transacted mode.
516 *
517 * @return true if the session is in transacted mode
518 * @throws JMSException if there is some internal error.
519 */
520 public boolean getTransacted() throws JMSException {
521 checkClosed();
522 return isTransacted();
523 }
524
525 /**
526 * Returns the acknowledgement mode of the session. The acknowledgement mode
527 * is set at the time that the session is created. If the session is
528 * transacted, the acknowledgement mode is ignored.
529 *
530 * @return If the session is not transacted, returns the current
531 * acknowledgement mode for the session. If the session is
532 * transacted, returns SESSION_TRANSACTED.
533 * @throws JMSException
534 * @see javax.jms.Connection#createSession(boolean,int)
535 * @since 1.1 exception JMSException if there is some internal error.
536 */
537 public int getAcknowledgeMode() throws JMSException {
538 checkClosed();
539 return this.acknowledgementMode;
540 }
541
542 /**
543 * Commits all messages done in this transaction and releases any locks
544 * currently held.
545 *
546 * @throws JMSException if the JMS provider fails to commit the transaction
547 * due to some internal error.
548 * @throws TransactionRolledBackException if the transaction is rolled back
549 * due to some internal error during commit.
550 * @throws javax.jms.IllegalStateException if the method is not called by a
551 * transacted session.
552 */
553 public void commit() throws JMSException {
554 checkClosed();
555 if (!getTransacted()) {
556 throw new javax.jms.IllegalStateException("Not a transacted session");
557 }
558 if (LOG.isDebugEnabled()) {
559 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
560 }
561 transactionContext.commit();
562 }
563
564 /**
565 * Rolls back any messages done in this transaction and releases any locks
566 * currently held.
567 *
568 * @throws JMSException if the JMS provider fails to roll back the
569 * transaction due to some internal error.
570 * @throws javax.jms.IllegalStateException if the method is not called by a
571 * transacted session.
572 */
573 public void rollback() throws JMSException {
574 checkClosed();
575 if (!getTransacted()) {
576 throw new javax.jms.IllegalStateException("Not a transacted session");
577 }
578 if (LOG.isDebugEnabled()) {
579 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId());
580 }
581 transactionContext.rollback();
582 }
583
584 /**
585 * Closes the session.
586 * <P>
587 * Since a provider may allocate some resources on behalf of a session
588 * outside the JVM, clients should close the resources when they are not
589 * needed. Relying on garbage collection to eventually reclaim these
590 * resources may not be timely enough.
591 * <P>
592 * There is no need to close the producers and consumers of a closed
593 * session.
594 * <P>
595 * This call will block until a <CODE>receive</CODE> call or message
596 * listener in progress has completed. A blocked message consumer
597 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
598 * is closed.
599 * <P>
600 * Closing a transacted session must roll back the transaction in progress.
601 * <P>
602 * This method is the only <CODE>Session</CODE> method that can be called
603 * concurrently.
604 * <P>
605 * Invoking any other <CODE>Session</CODE> method on a closed session must
606 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
607 * closed session must <I>not </I> throw an exception.
608 *
609 * @throws JMSException if the JMS provider fails to close the session due
610 * to some internal error.
611 */
612 public void close() throws JMSException {
613 if (!closed) {
614 if (getTransactionContext().isInXATransaction()) {
615 if (!synchronizationRegistered) {
616 synchronizationRegistered = true;
617 getTransactionContext().addSynchronization(new Synchronization() {
618
619 @Override
620 public void afterCommit() throws Exception {
621 doClose();
622 synchronizationRegistered = false;
623 }
624
625 @Override
626 public void afterRollback() throws Exception {
627 doClose();
628 synchronizationRegistered = false;
629 }
630 });
631 }
632
633 } else {
634 doClose();
635 }
636 }
637 }
638
639 private void doClose() throws JMSException {
640 boolean interrupted = Thread.interrupted();
641 dispose();
642 RemoveInfo removeCommand = info.createRemoveCommand();
643 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
644 connection.asyncSendPacket(removeCommand);
645 if (interrupted) {
646 Thread.currentThread().interrupt();
647 }
648 }
649
650 void clearMessagesInProgress() {
651 executor.clearMessagesInProgress();
652 // we are called from inside the transport reconnection logic
653 // which involves us clearing all the connections' consumers
654 // dispatch and delivered lists. So rather than trying to
655 // grab a mutex (which could be already owned by the message
656 // listener calling the send or an ack) we allow it to complete in
657 // a separate thread via the scheduler and notify us via
658 // connection.transportInterruptionProcessingComplete()
659 //
660 for (final ActiveMQMessageConsumer consumer : consumers) {
661 consumer.inProgressClearRequired();
662 try {
663 connection.getScheduler().executeAfterDelay(new Runnable() {
664 public void run() {
665 consumer.clearMessagesInProgress();
666 }}, 0l);
667 } catch (JMSException e) {
668 connection.onClientInternalException(e);
669 }
670 }
671 }
672
673 void deliverAcks() {
674 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
675 ActiveMQMessageConsumer consumer = iter.next();
676 consumer.deliverAcks();
677 }
678 }
679
680 public synchronized void dispose() throws JMSException {
681 if (!closed) {
682
683 try {
684 executor.stop();
685
686 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
687 ActiveMQMessageConsumer consumer = iter.next();
688 consumer.setFailureError(connection.getFirstFailureError());
689 consumer.dispose();
690 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
691 }
692 consumers.clear();
693
694 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
695 ActiveMQMessageProducer producer = iter.next();
696 producer.dispose();
697 }
698 producers.clear();
699
700 try {
701 if (getTransactionContext().isInLocalTransaction()) {
702 rollback();
703 }
704 } catch (JMSException e) {
705 }
706
707 } finally {
708 connection.removeSession(this);
709 this.transactionContext = null;
710 closed = true;
711 }
712 }
713 }
714
715 /**
716 * Checks that the session is not closed then configures the message
717 */
718 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
719 checkClosed();
720 message.setConnection(connection);
721 }
722
723 /**
724 * Check if the session is closed. It is used for ensuring that the session
725 * is open before performing various operations.
726 *
727 * @throws IllegalStateException if the Session is closed
728 */
729 protected void checkClosed() throws IllegalStateException {
730 if (closed) {
731 throw new IllegalStateException("The Session is closed");
732 }
733 }
734
735 /**
736 * Checks if the session is closed.
737 *
738 * @return true if the session is closed, false otherwise.
739 */
740 public boolean isClosed() {
741 return closed;
742 }
743
744 /**
745 * Stops message delivery in this session, and restarts message delivery
746 * with the oldest unacknowledged message.
747 * <P>
748 * All consumers deliver messages in a serial order. Acknowledging a
749 * received message automatically acknowledges all messages that have been
750 * delivered to the client.
751 * <P>
752 * Restarting a session causes it to take the following actions:
753 * <UL>
754 * <LI>Stop message delivery
755 * <LI>Mark all messages that might have been delivered but not
756 * acknowledged as "redelivered"
757 * <LI>Restart the delivery sequence including all unacknowledged messages
758 * that had been previously delivered. Redelivered messages do not have to
759 * be delivered in exactly their original delivery order.
760 * </UL>
761 *
762 * @throws JMSException if the JMS provider fails to stop and restart
763 * message delivery due to some internal error.
764 * @throws IllegalStateException if the method is called by a transacted
765 * session.
766 */
767 public void recover() throws JMSException {
768
769 checkClosed();
770 if (getTransacted()) {
771 throw new IllegalStateException("This session is transacted");
772 }
773
774 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
775 ActiveMQMessageConsumer c = iter.next();
776 c.rollback();
777 }
778
779 }
780
781 /**
782 * Returns the session's distinguished message listener (optional).
783 *
784 * @return the message listener associated with this session
785 * @throws JMSException if the JMS provider fails to get the message
786 * listener due to an internal error.
787 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
788 * @see javax.jms.ServerSessionPool
789 * @see javax.jms.ServerSession
790 */
791 public MessageListener getMessageListener() throws JMSException {
792 checkClosed();
793 return this.messageListener;
794 }
795
796 /**
797 * Sets the session's distinguished message listener (optional).
798 * <P>
799 * When the distinguished message listener is set, no other form of message
800 * receipt in the session can be used; however, all forms of sending
801 * messages are still supported.
802 * <P>
803 * If this session has been closed, then an {@link IllegalStateException} is
804 * thrown, if trying to set a new listener. However setting the listener
805 * to <tt>null</tt> is allowed, to clear the listener, even if this session
806 * has been closed prior.
807 * <P>
808 * This is an expert facility not used by regular JMS clients.
809 *
810 * @param listener the message listener to associate with this session
811 * @throws JMSException if the JMS provider fails to set the message
812 * listener due to an internal error.
813 * @see javax.jms.Session#getMessageListener()
814 * @see javax.jms.ServerSessionPool
815 * @see javax.jms.ServerSession
816 */
817 public void setMessageListener(MessageListener listener) throws JMSException {
818 // only check for closed if we set a new listener, as we allow to clear
819 // the listener, such as when an application is shutting down, and is
820 // no longer using a message listener on this session
821 if (listener != null) {
822 checkClosed();
823 }
824 this.messageListener = listener;
825
826 if (listener != null) {
827 executor.setDispatchedBySessionPool(true);
828 }
829 }
830
831 /**
832 * Optional operation, intended to be used only by Application Servers, not
833 * by ordinary JMS clients.
834 *
835 * @see javax.jms.ServerSession
836 */
837 public void run() {
838 MessageDispatch messageDispatch;
839 while ((messageDispatch = executor.dequeueNoWait()) != null) {
840 final MessageDispatch md = messageDispatch;
841 ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
842 if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
843 // TODO: Ack it without delivery to client
844 continue;
845 }
846
847 if (isClientAcknowledge()||isIndividualAcknowledge()) {
848 message.setAcknowledgeCallback(new Callback() {
849 public void execute() throws Exception {
850 }
851 });
852 }
853
854 if (deliveryListener != null) {
855 deliveryListener.beforeDelivery(this, message);
856 }
857
858 md.setDeliverySequenceId(getNextDeliveryId());
859
860 try {
861 messageListener.onMessage(message);
862 } catch (RuntimeException e) {
863 LOG.error("error dispatching message: ", e);
864 // A problem while invoking the MessageListener does not
865 // in general indicate a problem with the connection to the broker, i.e.
866 // it will usually be sufficient to let the afterDelivery() method either
867 // commit or roll back in order to deal with the exception.
868 // However, we notify any registered client internal exception listener
869 // of the problem.
870 connection.onClientInternalException(e);
871 }
872
873 try {
874 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
875 ack.setFirstMessageId(md.getMessage().getMessageId());
876 doStartTransaction();
877 ack.setTransactionId(getTransactionContext().getTransactionId());
878 if (ack.getTransactionId() != null) {
879 getTransactionContext().addSynchronization(new Synchronization() {
880
881 @Override
882 public void afterRollback() throws Exception {
883 md.getMessage().onMessageRolledBack();
884 // ensure we don't filter this as a duplicate
885 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
886 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
887 int redeliveryCounter = md.getMessage().getRedeliveryCounter();
888 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
889 && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
890 // We need to NACK the messages so that they get
891 // sent to the
892 // DLQ.
893 // Acknowledge the last message.
894 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
895 ack.setFirstMessageId(md.getMessage().getMessageId());
896 asyncSendPacket(ack);
897 } else {
898
899 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
900 ack.setFirstMessageId(md.getMessage().getMessageId());
901 asyncSendPacket(ack);
902
903 // Figure out how long we should wait to resend
904 // this message.
905 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
906 for (int i = 0; i < redeliveryCounter; i++) {
907 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
908 }
909 connection.getScheduler().executeAfterDelay(new Runnable() {
910
911 public void run() {
912 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
913 }
914 }, redeliveryDelay);
915 }
916 }
917 });
918 }
919 asyncSendPacket(ack);
920 } catch (Throwable e) {
921 connection.onClientInternalException(e);
922 }
923
924 if (deliveryListener != null) {
925 deliveryListener.afterDelivery(this, message);
926 }
927 }
928 }
929
930 /**
931 * Creates a <CODE>MessageProducer</CODE> to send messages to the
932 * specified destination.
933 * <P>
934 * A client uses a <CODE>MessageProducer</CODE> object to send messages to
935 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
936 * inherit from <CODE>Destination</CODE>, they can be used in the
937 * destination parameter to create a <CODE>MessageProducer</CODE> object.
938 *
939 * @param destination the <CODE>Destination</CODE> to send to, or null if
940 * this is a producer which does not have a specified
941 * destination.
942 * @return the MessageProducer
943 * @throws JMSException if the session fails to create a MessageProducer due
944 * to some internal error.
945 * @throws InvalidDestinationException if an invalid destination is
946 * specified.
947 * @since 1.1
948 */
949 public MessageProducer createProducer(Destination destination) throws JMSException {
950 checkClosed();
951 if (destination instanceof CustomDestination) {
952 CustomDestination customDestination = (CustomDestination)destination;
953 return customDestination.createProducer(this);
954 }
955 int timeSendOut = connection.getSendTimeout();
956 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
957 }
958
959 /**
960 * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
961 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
962 * <CODE>Destination</CODE>, they can be used in the destination
963 * parameter to create a <CODE>MessageConsumer</CODE>.
964 *
965 * @param destination the <CODE>Destination</CODE> to access.
966 * @return the MessageConsumer
967 * @throws JMSException if the session fails to create a consumer due to
968 * some internal error.
969 * @throws InvalidDestinationException if an invalid destination is
970 * specified.
971 * @since 1.1
972 */
973 public MessageConsumer createConsumer(Destination destination) throws JMSException {
974 return createConsumer(destination, (String) null);
975 }
976
977 /**
978 * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
979 * using a message selector. Since <CODE> Queue</CODE> and
980 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
981 * can be used in the destination parameter to create a
982 * <CODE>MessageConsumer</CODE>.
983 * <P>
984 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
985 * that have been sent to a destination.
986 *
987 * @param destination the <CODE>Destination</CODE> to access
988 * @param messageSelector only messages with properties matching the message
989 * selector expression are delivered. A value of null or an
990 * empty string indicates that there is no message selector
991 * for the message consumer.
992 * @return the MessageConsumer
993 * @throws JMSException if the session fails to create a MessageConsumer due
994 * to some internal error.
995 * @throws InvalidDestinationException if an invalid destination is
996 * specified.
997 * @throws InvalidSelectorException if the message selector is invalid.
998 * @since 1.1
999 */
1000 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
1001 return createConsumer(destination, messageSelector, false);
1002 }
1003
1004 /**
1005 * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1006 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1007 * <CODE>Destination</CODE>, they can be used in the destination
1008 * parameter to create a <CODE>MessageConsumer</CODE>.
1009 *
1010 * @param destination the <CODE>Destination</CODE> to access.
1011 * @param messageListener the listener to use for async consumption of messages
1012 * @return the MessageConsumer
1013 * @throws JMSException if the session fails to create a consumer due to
1014 * some internal error.
1015 * @throws InvalidDestinationException if an invalid destination is
1016 * specified.
1017 * @since 1.1
1018 */
1019 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1020 return createConsumer(destination, null, messageListener);
1021 }
1022
1023 /**
1024 * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1025 * using a message selector. Since <CODE> Queue</CODE> and
1026 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1027 * can be used in the destination parameter to create a
1028 * <CODE>MessageConsumer</CODE>.
1029 * <P>
1030 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1031 * that have been sent to a destination.
1032 *
1033 * @param destination the <CODE>Destination</CODE> to access
1034 * @param messageSelector only messages with properties matching the message
1035 * selector expression are delivered. A value of null or an
1036 * empty string indicates that there is no message selector
1037 * for the message consumer.
1038 * @param messageListener the listener to use for async consumption of messages
1039 * @return the MessageConsumer
1040 * @throws JMSException if the session fails to create a MessageConsumer due
1041 * to some internal error.
1042 * @throws InvalidDestinationException if an invalid destination is
1043 * specified.
1044 * @throws InvalidSelectorException if the message selector is invalid.
1045 * @since 1.1
1046 */
1047 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1048 return createConsumer(destination, messageSelector, false, messageListener);
1049 }
1050
1051 /**
1052 * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1053 * using a message selector. This method can specify whether messages
1054 * published by its own connection should be delivered to it, if the
1055 * destination is a topic.
1056 * <P>
1057 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1058 * <CODE>Destination</CODE>, they can be used in the destination
1059 * parameter to create a <CODE>MessageConsumer</CODE>.
1060 * <P>
1061 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1062 * that have been published to a destination.
1063 * <P>
1064 * In some cases, a connection may both publish and subscribe to a topic.
1065 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1066 * inhibit the delivery of messages published by its own connection. The
1067 * default value for this attribute is False. The <CODE>noLocal</CODE>
1068 * value must be supported by destinations that are topics.
1069 *
1070 * @param destination the <CODE>Destination</CODE> to access
1071 * @param messageSelector only messages with properties matching the message
1072 * selector expression are delivered. A value of null or an
1073 * empty string indicates that there is no message selector
1074 * for the message consumer.
1075 * @param noLocal - if true, and the destination is a topic, inhibits the
1076 * delivery of messages published by its own connection. The
1077 * behavior for <CODE>NoLocal</CODE> is not specified if
1078 * the destination is a queue.
1079 * @return the MessageConsumer
1080 * @throws JMSException if the session fails to create a MessageConsumer due
1081 * to some internal error.
1082 * @throws InvalidDestinationException if an invalid destination is
1083 * specified.
1084 * @throws InvalidSelectorException if the message selector is invalid.
1085 * @since 1.1
1086 */
1087 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1088 return createConsumer(destination, messageSelector, noLocal, null);
1089 }
1090
1091 /**
1092 * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1093 * using a message selector. This method can specify whether messages
1094 * published by its own connection should be delivered to it, if the
1095 * destination is a topic.
1096 * <P>
1097 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1098 * <CODE>Destination</CODE>, they can be used in the destination
1099 * parameter to create a <CODE>MessageConsumer</CODE>.
1100 * <P>
1101 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1102 * that have been published to a destination.
1103 * <P>
1104 * In some cases, a connection may both publish and subscribe to a topic.
1105 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1106 * inhibit the delivery of messages published by its own connection. The
1107 * default value for this attribute is False. The <CODE>noLocal</CODE>
1108 * value must be supported by destinations that are topics.
1109 *
1110 * @param destination the <CODE>Destination</CODE> to access
1111 * @param messageSelector only messages with properties matching the message
1112 * selector expression are delivered. A value of null or an
1113 * empty string indicates that there is no message selector
1114 * for the message consumer.
1115 * @param noLocal - if true, and the destination is a topic, inhibits the
1116 * delivery of messages published by its own connection. The
1117 * behavior for <CODE>NoLocal</CODE> is not specified if
1118 * the destination is a queue.
1119 * @param messageListener the listener to use for async consumption of messages
1120 * @return the MessageConsumer
1121 * @throws JMSException if the session fails to create a MessageConsumer due
1122 * to some internal error.
1123 * @throws InvalidDestinationException if an invalid destination is
1124 * specified.
1125 * @throws InvalidSelectorException if the message selector is invalid.
1126 * @since 1.1
1127 */
1128 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1129 checkClosed();
1130
1131 if (destination instanceof CustomDestination) {
1132 CustomDestination customDestination = (CustomDestination)destination;
1133 return customDestination.createConsumer(this, messageSelector, noLocal);
1134 }
1135
1136 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1137 int prefetch = 0;
1138 if (destination instanceof Topic) {
1139 prefetch = prefetchPolicy.getTopicPrefetch();
1140 } else {
1141 prefetch = prefetchPolicy.getQueuePrefetch();
1142 }
1143 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1144 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1145 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1146 }
1147
1148 /**
1149 * Creates a queue identity given a <CODE>Queue</CODE> name.
1150 * <P>
1151 * This facility is provided for the rare cases where clients need to
1152 * dynamically manipulate queue identity. It allows the creation of a queue
1153 * identity with a provider-specific name. Clients that depend on this
1154 * ability are not portable.
1155 * <P>
1156 * Note that this method is not for creating the physical queue. The
1157 * physical creation of queues is an administrative task and is not to be
1158 * initiated by the JMS API. The one exception is the creation of temporary
1159 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1160 * method.
1161 *
1162 * @param queueName the name of this <CODE>Queue</CODE>
1163 * @return a <CODE>Queue</CODE> with the given name
1164 * @throws JMSException if the session fails to create a queue due to some
1165 * internal error.
1166 * @since 1.1
1167 */
1168 public Queue createQueue(String queueName) throws JMSException {
1169 checkClosed();
1170 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1171 return new ActiveMQTempQueue(queueName);
1172 }
1173 return new ActiveMQQueue(queueName);
1174 }
1175
1176 /**
1177 * Creates a topic identity given a <CODE>Topic</CODE> name.
1178 * <P>
1179 * This facility is provided for the rare cases where clients need to
1180 * dynamically manipulate topic identity. This allows the creation of a
1181 * topic identity with a provider-specific name. Clients that depend on this
1182 * ability are not portable.
1183 * <P>
1184 * Note that this method is not for creating the physical topic. The
1185 * physical creation of topics is an administrative task and is not to be
1186 * initiated by the JMS API. The one exception is the creation of temporary
1187 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1188 * method.
1189 *
1190 * @param topicName the name of this <CODE>Topic</CODE>
1191 * @return a <CODE>Topic</CODE> with the given name
1192 * @throws JMSException if the session fails to create a topic due to some
1193 * internal error.
1194 * @since 1.1
1195 */
1196 public Topic createTopic(String topicName) throws JMSException {
1197 checkClosed();
1198 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1199 return new ActiveMQTempTopic(topicName);
1200 }
1201 return new ActiveMQTopic(topicName);
1202 }
1203
1204 /**
1205 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1206 * the specified queue.
1207 *
1208 * @param queue the <CODE>queue</CODE> to access
1209 * @exception InvalidDestinationException if an invalid destination is
1210 * specified
1211 * @since 1.1
1212 */
1213 /**
1214 * Creates a durable subscriber to the specified topic.
1215 * <P>
1216 * If a client needs to receive all the messages published on a topic,
1217 * including the ones published while the subscriber is inactive, it uses a
1218 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1219 * record of this durable subscription and insures that all messages from
1220 * the topic's publishers are retained until they are acknowledged by this
1221 * durable subscriber or they have expired.
1222 * <P>
1223 * Sessions with durable subscribers must always provide the same client
1224 * identifier. In addition, each client must specify a name that uniquely
1225 * identifies (within client identifier) each durable subscription it
1226 * creates. Only one session at a time can have a
1227 * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1228 * <P>
1229 * A client can change an existing durable subscription by creating a
1230 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1231 * and/or message selector. Changing a durable subscriber is equivalent to
1232 * unsubscribing (deleting) the old one and creating a new one.
1233 * <P>
1234 * In some cases, a connection may both publish and subscribe to a topic.
1235 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1236 * inhibit the delivery of messages published by its own connection. The
1237 * default value for this attribute is false.
1238 *
1239 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1240 * @param name the name used to identify this subscription
1241 * @return the TopicSubscriber
1242 * @throws JMSException if the session fails to create a subscriber due to
1243 * some internal error.
1244 * @throws InvalidDestinationException if an invalid topic is specified.
1245 * @since 1.1
1246 */
1247 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1248 checkClosed();
1249 return createDurableSubscriber(topic, name, null, false);
1250 }
1251
1252 /**
1253 * Creates a durable subscriber to the specified topic, using a message
1254 * selector and specifying whether messages published by its own connection
1255 * should be delivered to it.
1256 * <P>
1257 * If a client needs to receive all the messages published on a topic,
1258 * including the ones published while the subscriber is inactive, it uses a
1259 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1260 * record of this durable subscription and insures that all messages from
1261 * the topic's publishers are retained until they are acknowledged by this
1262 * durable subscriber or they have expired.
1263 * <P>
1264 * Sessions with durable subscribers must always provide the same client
1265 * identifier. In addition, each client must specify a name which uniquely
1266 * identifies (within client identifier) each durable subscription it
1267 * creates. Only one session at a time can have a
1268 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1269 * inactive durable subscriber is one that exists but does not currently
1270 * have a message consumer associated with it.
1271 * <P>
1272 * A client can change an existing durable subscription by creating a
1273 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1274 * and/or message selector. Changing a durable subscriber is equivalent to
1275 * unsubscribing (deleting) the old one and creating a new one.
1276 *
1277 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1278 * @param name the name used to identify this subscription
1279 * @param messageSelector only messages with properties matching the message
1280 * selector expression are delivered. A value of null or an
1281 * empty string indicates that there is no message selector
1282 * for the message consumer.
1283 * @param noLocal if set, inhibits the delivery of messages published by its
1284 * own connection
1285 * @return the Queue Browser
1286 * @throws JMSException if the session fails to create a subscriber due to
1287 * some internal error.
1288 * @throws InvalidDestinationException if an invalid topic is specified.
1289 * @throws InvalidSelectorException if the message selector is invalid.
1290 * @since 1.1
1291 */
1292 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1293 checkClosed();
1294
1295 if (topic == null) {
1296 throw new InvalidDestinationException("Topic cannot be null");
1297 }
1298
1299 if (isIndividualAcknowledge()) {
1300 throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+
1301 "INDIVIDUAL_ACKNOWLEDGE mode.", null);
1302 }
1303
1304 if (topic instanceof CustomDestination) {
1305 CustomDestination customDestination = (CustomDestination)topic;
1306 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1307 }
1308
1309 connection.checkClientIDWasManuallySpecified();
1310 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1311 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1312 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1313 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1314 noLocal, false, asyncDispatch);
1315 }
1316
1317 /**
1318 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1319 * the specified queue.
1320 *
1321 * @param queue the <CODE>queue</CODE> to access
1322 * @return the Queue Browser
1323 * @throws JMSException if the session fails to create a browser due to some
1324 * internal error.
1325 * @throws InvalidDestinationException if an invalid destination is
1326 * specified
1327 * @since 1.1
1328 */
1329 public QueueBrowser createBrowser(Queue queue) throws JMSException {
1330 checkClosed();
1331 return createBrowser(queue, null);
1332 }
1333
1334 /**
1335 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1336 * the specified queue using a message selector.
1337 *
1338 * @param queue the <CODE>queue</CODE> to access
1339 * @param messageSelector only messages with properties matching the message
1340 * selector expression are delivered. A value of null or an
1341 * empty string indicates that there is no message selector
1342 * for the message consumer.
1343 * @return the Queue Browser
1344 * @throws JMSException if the session fails to create a browser due to some
1345 * internal error.
1346 * @throws InvalidDestinationException if an invalid destination is
1347 * specified
1348 * @throws InvalidSelectorException if the message selector is invalid.
1349 * @since 1.1
1350 */
1351 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1352 checkClosed();
1353 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1354 }
1355
1356 /**
1357 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1358 * of the <CODE>Connection</CODE> unless it is deleted earlier.
1359 *
1360 * @return a temporary queue identity
1361 * @throws JMSException if the session fails to create a temporary queue due
1362 * to some internal error.
1363 * @since 1.1
1364 */
1365 public TemporaryQueue createTemporaryQueue() throws JMSException {
1366 checkClosed();
1367 return (TemporaryQueue)connection.createTempDestination(false);
1368 }
1369
1370 /**
1371 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1372 * of the <CODE>Connection</CODE> unless it is deleted earlier.
1373 *
1374 * @return a temporary topic identity
1375 * @throws JMSException if the session fails to create a temporary topic due
1376 * to some internal error.
1377 * @since 1.1
1378 */
1379 public TemporaryTopic createTemporaryTopic() throws JMSException {
1380 checkClosed();
1381 return (TemporaryTopic)connection.createTempDestination(true);
1382 }
1383
1384 /**
1385 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1386 * the specified queue.
1387 *
1388 * @param queue the <CODE>Queue</CODE> to access
1389 * @return
1390 * @throws JMSException if the session fails to create a receiver due to
1391 * some internal error.
1392 * @throws JMSException
1393 * @throws InvalidDestinationException if an invalid queue is specified.
1394 */
1395 public QueueReceiver createReceiver(Queue queue) throws JMSException {
1396 checkClosed();
1397 return createReceiver(queue, null);
1398 }
1399
1400 /**
1401 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1402 * the specified queue using a message selector.
1403 *
1404 * @param queue the <CODE>Queue</CODE> to access
1405 * @param messageSelector only messages with properties matching the message
1406 * selector expression are delivered. A value of null or an
1407 * empty string indicates that there is no message selector
1408 * for the message consumer.
1409 * @return QueueReceiver
1410 * @throws JMSException if the session fails to create a receiver due to
1411 * some internal error.
1412 * @throws InvalidDestinationException if an invalid queue is specified.
1413 * @throws InvalidSelectorException if the message selector is invalid.
1414 */
1415 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1416 checkClosed();
1417
1418 if (queue instanceof CustomDestination) {
1419 CustomDestination customDestination = (CustomDestination)queue;
1420 return customDestination.createReceiver(this, messageSelector);
1421 }
1422
1423 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1424 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1425 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1426 }
1427
1428 /**
1429 * Creates a <CODE>QueueSender</CODE> object to send messages to the
1430 * specified queue.
1431 *
1432 * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1433 * unidentified producer
1434 * @return QueueSender
1435 * @throws JMSException if the session fails to create a sender due to some
1436 * internal error.
1437 * @throws InvalidDestinationException if an invalid queue is specified.
1438 */
1439 public QueueSender createSender(Queue queue) throws JMSException {
1440 checkClosed();
1441 if (queue instanceof CustomDestination) {
1442 CustomDestination customDestination = (CustomDestination)queue;
1443 return customDestination.createSender(this);
1444 }
1445 int timeSendOut = connection.getSendTimeout();
1446 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1447 }
1448
1449 /**
1450 * Creates a nondurable subscriber to the specified topic. <p/>
1451 * <P>
1452 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1453 * that have been published to a topic. <p/>
1454 * <P>
1455 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1456 * receive only messages that are published while they are active. <p/>
1457 * <P>
1458 * In some cases, a connection may both publish and subscribe to a topic.
1459 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1460 * inhibit the delivery of messages published by its own connection. The
1461 * default value for this attribute is false.
1462 *
1463 * @param topic the <CODE>Topic</CODE> to subscribe to
1464 * @return TopicSubscriber
1465 * @throws JMSException if the session fails to create a subscriber due to
1466 * some internal error.
1467 * @throws InvalidDestinationException if an invalid topic is specified.
1468 */
1469 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1470 checkClosed();
1471 return createSubscriber(topic, null, false);
1472 }
1473
1474 /**
1475 * Creates a nondurable subscriber to the specified topic, using a message
1476 * selector or specifying whether messages published by its own connection
1477 * should be delivered to it. <p/>
1478 * <P>
1479 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1480 * that have been published to a topic. <p/>
1481 * <P>
1482 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1483 * receive only messages that are published while they are active. <p/>
1484 * <P>
1485 * Messages filtered out by a subscriber's message selector will never be
1486 * delivered to the subscriber. From the subscriber's perspective, they do
1487 * not exist. <p/>
1488 * <P>
1489 * In some cases, a connection may both publish and subscribe to a topic.
1490 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1491 * inhibit the delivery of messages published by its own connection. The
1492 * default value for this attribute is false.
1493 *
1494 * @param topic the <CODE>Topic</CODE> to subscribe to
1495 * @param messageSelector only messages with properties matching the message
1496 * selector expression are delivered. A value of null or an
1497 * empty string indicates that there is no message selector
1498 * for the message consumer.
1499 * @param noLocal if set, inhibits the delivery of messages published by its
1500 * own connection
1501 * @return TopicSubscriber
1502 * @throws JMSException if the session fails to create a subscriber due to
1503 * some internal error.
1504 * @throws InvalidDestinationException if an invalid topic is specified.
1505 * @throws InvalidSelectorException if the message selector is invalid.
1506 */
1507 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1508 checkClosed();
1509
1510 if (topic instanceof CustomDestination) {
1511 CustomDestination customDestination = (CustomDestination)topic;
1512 return customDestination.createSubscriber(this, messageSelector, noLocal);
1513 }
1514
1515 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1516 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1517 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1518 }
1519
1520 /**
1521 * Creates a publisher for the specified topic. <p/>
1522 * <P>
1523 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1524 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1525 * a topic, it defines a new sequence of messages that have no ordering
1526 * relationship with the messages it has previously sent.
1527 *
1528 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1529 * an unidentified producer
1530 * @return TopicPublisher
1531 * @throws JMSException if the session fails to create a publisher due to
1532 * some internal error.
1533 * @throws InvalidDestinationException if an invalid topic is specified.
1534 */
1535 public TopicPublisher createPublisher(Topic topic) throws JMSException {
1536 checkClosed();
1537
1538 if (topic instanceof CustomDestination) {
1539 CustomDestination customDestination = (CustomDestination)topic;
1540 return customDestination.createPublisher(this);
1541 }
1542 int timeSendOut = connection.getSendTimeout();
1543 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1544 }
1545
1546 /**
1547 * Unsubscribes a durable subscription that has been created by a client.
1548 * <P>
1549 * This method deletes the state being maintained on behalf of the
1550 * subscriber by its provider.
1551 * <P>
1552 * It is erroneous for a client to delete a durable subscription while there
1553 * is an active <CODE>MessageConsumer </CODE> or
1554 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1555 * message is part of a pending transaction or has not been acknowledged in
1556 * the session.
1557 *
1558 * @param name the name used to identify this subscription
1559 * @throws JMSException if the session fails to unsubscribe to the durable
1560 * subscription due to some internal error.
1561 * @throws InvalidDestinationException if an invalid subscription name is
1562 * specified.
1563 * @since 1.1
1564 */
1565 public void unsubscribe(String name) throws JMSException {
1566 checkClosed();
1567 connection.unsubscribe(name);
1568 }
1569
1570 public void dispatch(MessageDispatch messageDispatch) {
1571 try {
1572 executor.execute(messageDispatch);
1573 } catch (InterruptedException e) {
1574 Thread.currentThread().interrupt();
1575 connection.onClientInternalException(e);
1576 }
1577 }
1578
1579 /**
1580 * Acknowledges all consumed messages of the session of this consumed
1581 * message.
1582 * <P>
1583 * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1584 * for use when a client has specified that its JMS session's consumed
1585 * messages are to be explicitly acknowledged. By invoking
1586 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1587 * all messages consumed by the session that the message was delivered to.
1588 * <P>
1589 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1590 * sessions and sessions specified to use implicit acknowledgement modes.
1591 * <P>
1592 * A client may individually acknowledge each message as it is consumed, or
1593 * it may choose to acknowledge messages as an application-defined group
1594 * (which is done by calling acknowledge on the last received message of the
1595 * group, thereby acknowledging all messages consumed by the session.)
1596 * <P>
1597 * Messages that have been received but not acknowledged may be redelivered.
1598 *
1599 * @throws JMSException if the JMS provider fails to acknowledge the
1600 * messages due to some internal error.
1601 * @throws javax.jms.IllegalStateException if this method is called on a
1602 * closed session.
1603 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1604 */
1605 public void acknowledge() throws JMSException {
1606 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1607 ActiveMQMessageConsumer c = iter.next();
1608 c.acknowledge();
1609 }
1610 }
1611
1612 /**
1613 * Add a message consumer.
1614 *
1615 * @param consumer - message consumer.
1616 * @throws JMSException
1617 */
1618 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1619 this.consumers.add(consumer);
1620 if (consumer.isDurableSubscriber()) {
1621 stats.onCreateDurableSubscriber();
1622 }
1623 this.connection.addDispatcher(consumer.getConsumerId(), this);
1624 }
1625
1626 /**
1627 * Remove the message consumer.
1628 *
1629 * @param consumer - consumer to be removed.
1630 * @throws JMSException
1631 */
1632 protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1633 this.connection.removeDispatcher(consumer.getConsumerId());
1634 if (consumer.isDurableSubscriber()) {
1635 stats.onRemoveDurableSubscriber();
1636 }
1637 this.consumers.remove(consumer);
1638 this.connection.removeDispatcher(consumer);
1639 }
1640
1641 /**
1642 * Adds a message producer.
1643 *
1644 * @param producer - message producer to be added.
1645 * @throws JMSException
1646 */
1647 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1648 this.producers.add(producer);
1649 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1650 }
1651
1652 /**
1653 * Removes a message producer.
1654 *
1655 * @param producer - message producer to be removed.
1656 * @throws JMSException
1657 */
1658 protected void removeProducer(ActiveMQMessageProducer producer) {
1659 this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1660 this.producers.remove(producer);
1661 }
1662
1663 /**
1664 * Start this Session.
1665 *
1666 * @throws JMSException
1667 */
1668 protected void start() throws JMSException {
1669 started.set(true);
1670 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1671 ActiveMQMessageConsumer c = iter.next();
1672 c.start();
1673 }
1674 executor.start();
1675 }
1676
1677 /**
1678 * Stops this session.
1679 *
1680 * @throws JMSException
1681 */
1682 protected void stop() throws JMSException {
1683
1684 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1685 ActiveMQMessageConsumer c = iter.next();
1686 c.stop();
1687 }
1688
1689 started.set(false);
1690 executor.stop();
1691 }
1692
1693 /**
1694 * Returns the session id.
1695 *
1696 * @return value - session id.
1697 */
1698 protected SessionId getSessionId() {
1699 return info.getSessionId();
1700 }
1701
1702 /**
1703 * @return
1704 */
1705 protected ConsumerId getNextConsumerId() {
1706 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1707 }
1708
1709 /**
1710 * @return
1711 */
1712 protected ProducerId getNextProducerId() {
1713 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1714 }
1715
1716 /**
1717 * Sends the message for dispatch by the broker.
1718 *
1719 *
1720 * @param producer - message producer.
1721 * @param destination - message destination.
1722 * @param message - message to be sent.
1723 * @param deliveryMode - JMS messsage delivery mode.
1724 * @param priority - message priority.
1725 * @param timeToLive - message expiration.
1726 * @param producerWindow
1727 * @param onComplete
1728 * @throws JMSException
1729 */
1730 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1731 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1732
1733 checkClosed();
1734 if (destination.isTemporary() && connection.isDeleted(destination)) {
1735 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1736 }
1737 synchronized (sendMutex) {
1738 // tell the Broker we are about to start a new transaction
1739 doStartTransaction();
1740 TransactionId txid = transactionContext.getTransactionId();
1741 long sequenceNumber = producer.getMessageSequence();
1742
1743 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1744 message.setJMSDeliveryMode(deliveryMode);
1745 long expiration = 0L;
1746 if (!producer.getDisableMessageTimestamp()) {
1747 long timeStamp = System.currentTimeMillis();
1748 message.setJMSTimestamp(timeStamp);
1749 if (timeToLive > 0) {
1750 expiration = timeToLive + timeStamp;
1751 }
1752 }
1753 message.setJMSExpiration(expiration);
1754 message.setJMSPriority(priority);
1755 message.setJMSRedelivered(false);
1756
1757 // transform to our own message format here
1758 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1759 msg.setDestination(destination);
1760 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1761
1762 // Set the message id.
1763 if (msg != message) {
1764 message.setJMSMessageID(msg.getMessageId().toString());
1765 // Make sure the JMS destination is set on the foreign messages too.
1766 message.setJMSDestination(destination);
1767 }
1768 //clear the brokerPath in case we are re-sending this message
1769 msg.setBrokerPath(null);
1770
1771 msg.setTransactionId(txid);
1772 if (connection.isCopyMessageOnSend()) {
1773 msg = (ActiveMQMessage)msg.copy();
1774 }
1775 msg.setConnection(connection);
1776 msg.onSend();
1777 msg.setProducerId(msg.getMessageId().getProducerId());
1778 if (LOG.isTraceEnabled()) {
1779 LOG.trace(getSessionId() + " sending message: " + msg);
1780 }
1781 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1782 this.connection.asyncSendPacket(msg);
1783 if (producerWindow != null) {
1784 // Since we defer lots of the marshaling till we hit the
1785 // wire, this might not
1786 // provide and accurate size. We may change over to doing
1787 // more aggressive marshaling,
1788 // to get more accurate sizes.. this is more important once
1789 // users start using producer window
1790 // flow control.
1791 int size = msg.getSize();
1792 producerWindow.increaseUsage(size);
1793 }
1794 } else {
1795 if (sendTimeout > 0 && onComplete==null) {
1796 this.connection.syncSendPacket(msg,sendTimeout);
1797 }else {
1798 this.connection.syncSendPacket(msg, onComplete);
1799 }
1800 }
1801
1802 }
1803 }
1804
1805 /**
1806 * Send TransactionInfo to indicate transaction has started
1807 *
1808 * @throws JMSException if some internal error occurs
1809 */
1810 protected void doStartTransaction() throws JMSException {
1811 if (getTransacted() && !transactionContext.isInXATransaction()) {
1812 transactionContext.begin();
1813 }
1814 }
1815
1816 /**
1817 * Checks whether the session has unconsumed messages.
1818 *
1819 * @return true - if there are unconsumed messages.
1820 */
1821 public boolean hasUncomsumedMessages() {
1822 return executor.hasUncomsumedMessages();
1823 }
1824
1825 /**
1826 * Checks whether the session uses transactions.
1827 *
1828 * @return true - if the session uses transactions.
1829 */
1830 public boolean isTransacted() {
1831 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1832 }
1833
1834 /**
1835 * Checks whether the session used client acknowledgment.
1836 *
1837 * @return true - if the session uses client acknowledgment.
1838 */
1839 protected boolean isClientAcknowledge() {
1840 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1841 }
1842
1843 /**
1844 * Checks whether the session used auto acknowledgment.
1845 *
1846 * @return true - if the session uses client acknowledgment.
1847 */
1848 public boolean isAutoAcknowledge() {
1849 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1850 }
1851
1852 /**
1853 * Checks whether the session used dup ok acknowledgment.
1854 *
1855 * @return true - if the session uses client acknowledgment.
1856 */
1857 public boolean isDupsOkAcknowledge() {
1858 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1859 }
1860
1861 public boolean isIndividualAcknowledge(){
1862 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1863 }
1864
1865 /**
1866 * Returns the message delivery listener.
1867 *
1868 * @return deliveryListener - message delivery listener.
1869 */
1870 public DeliveryListener getDeliveryListener() {
1871 return deliveryListener;
1872 }
1873
1874 /**
1875 * Sets the message delivery listener.
1876 *
1877 * @param deliveryListener - message delivery listener.
1878 */
1879 public void setDeliveryListener(DeliveryListener deliveryListener) {
1880 this.deliveryListener = deliveryListener;
1881 }
1882
1883 /**
1884 * Returns the SessionInfo bean.
1885 *
1886 * @return info - SessionInfo bean.
1887 * @throws JMSException
1888 */
1889 protected SessionInfo getSessionInfo() throws JMSException {
1890 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1891 return info;
1892 }
1893
1894 /**
1895 * Send the asynchronus command.
1896 *
1897 * @param command - command to be executed.
1898 * @throws JMSException
1899 */
1900 public void asyncSendPacket(Command command) throws JMSException {
1901 connection.asyncSendPacket(command);
1902 }
1903
1904 /**
1905 * Send the synchronus command.
1906 *
1907 * @param command - command to be executed.
1908 * @return Response
1909 * @throws JMSException
1910 */
1911 public Response syncSendPacket(Command command) throws JMSException {
1912 return connection.syncSendPacket(command);
1913 }
1914
1915 public long getNextDeliveryId() {
1916 return deliveryIdGenerator.getNextSequenceId();
1917 }
1918
1919 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1920
1921 List<MessageDispatch> c = unconsumedMessages.removeAll();
1922 for (MessageDispatch md : c) {
1923 this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1924 }
1925 Collections.reverse(c);
1926
1927 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1928 MessageDispatch md = iter.next();
1929 executor.executeFirst(md);
1930 }
1931
1932 }
1933
1934 public boolean isRunning() {
1935 return started.get();
1936 }
1937
1938 public boolean isAsyncDispatch() {
1939 return asyncDispatch;
1940 }
1941
1942 public void setAsyncDispatch(boolean asyncDispatch) {
1943 this.asyncDispatch = asyncDispatch;
1944 }
1945
1946 /**
1947 * @return Returns the sessionAsyncDispatch.
1948 */
1949 public boolean isSessionAsyncDispatch() {
1950 return sessionAsyncDispatch;
1951 }
1952
1953 /**
1954 * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1955 */
1956 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1957 this.sessionAsyncDispatch = sessionAsyncDispatch;
1958 }
1959
1960 public MessageTransformer getTransformer() {
1961 return transformer;
1962 }
1963
1964 public ActiveMQConnection getConnection() {
1965 return connection;
1966 }
1967
1968 /**
1969 * Sets the transformer used to transform messages before they are sent on
1970 * to the JMS bus or when they are received from the bus but before they are
1971 * delivered to the JMS client
1972 */
1973 public void setTransformer(MessageTransformer transformer) {
1974 this.transformer = transformer;
1975 }
1976
1977 public BlobTransferPolicy getBlobTransferPolicy() {
1978 return blobTransferPolicy;
1979 }
1980
1981 /**
1982 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1983 * OBjects) are transferred from producers to brokers to consumers
1984 */
1985 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1986 this.blobTransferPolicy = blobTransferPolicy;
1987 }
1988
1989 public List<MessageDispatch> getUnconsumedMessages() {
1990 return executor.getUnconsumedMessages();
1991 }
1992
1993 @Override
1994 public String toString() {
1995 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1996 }
1997
1998 public void checkMessageListener() throws JMSException {
1999 if (messageListener != null) {
2000 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2001 }
2002 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
2003 ActiveMQMessageConsumer consumer = i.next();
2004 if (consumer.getMessageListener() != null) {
2005 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2006 }
2007 }
2008 }
2009
2010 protected void setOptimizeAcknowledge(boolean value) {
2011 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2012 ActiveMQMessageConsumer c = iter.next();
2013 c.setOptimizeAcknowledge(value);
2014 }
2015 }
2016
2017 protected void setPrefetchSize(ConsumerId id, int prefetch) {
2018 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2019 ActiveMQMessageConsumer c = iter.next();
2020 if (c.getConsumerId().equals(id)) {
2021 c.setPrefetchSize(prefetch);
2022 break;
2023 }
2024 }
2025 }
2026
2027 protected void close(ConsumerId id) {
2028 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2029 ActiveMQMessageConsumer c = iter.next();
2030 if (c.getConsumerId().equals(id)) {
2031 try {
2032 c.close();
2033 } catch (JMSException e) {
2034 LOG.warn("Exception closing consumer", e);
2035 }
2036 LOG.warn("Closed consumer on Command");
2037 break;
2038 }
2039 }
2040 }
2041
2042 public boolean isInUse(ActiveMQTempDestination destination) {
2043 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2044 ActiveMQMessageConsumer c = iter.next();
2045 if (c.isInUse(destination)) {
2046 return true;
2047 }
2048 }
2049 return false;
2050 }
2051
2052 /**
2053 * highest sequence id of the last message delivered by this session.
2054 * Passed to the broker in the close command, maintained by dispose()
2055 * @return lastDeliveredSequenceId
2056 */
2057 public long getLastDeliveredSequenceId() {
2058 return lastDeliveredSequenceId;
2059 }
2060
2061 protected void sendAck(MessageAck ack) throws JMSException {
2062 sendAck(ack,false);
2063 }
2064
2065 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2066 if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2067 asyncSendPacket(ack);
2068 } else {
2069 syncSendPacket(ack);
2070 }
2071 }
2072
2073 protected Scheduler getScheduler() throws JMSException {
2074 return this.connection.getScheduler();
2075 }
2076
2077 protected ThreadPoolExecutor getConnectionExecutor() {
2078 return this.connectionExecutor;
2079 }
2080 }