001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.util.HashMap;
020 import java.util.Iterator;
021 import java.util.LinkedList;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.concurrent.ExecutorService;
025 import java.util.concurrent.Executors;
026 import java.util.concurrent.TimeUnit;
027 import java.util.concurrent.atomic.AtomicBoolean;
028 import java.util.concurrent.atomic.AtomicReference;
029
030 import javax.jms.IllegalStateException;
031 import javax.jms.InvalidDestinationException;
032 import javax.jms.JMSException;
033 import javax.jms.Message;
034 import javax.jms.MessageListener;
035
036 import org.apache.activemq.command.ActiveMQDestination;
037 import org.apache.activemq.command.ActiveMQMessage;
038 import org.apache.activemq.command.ActiveMQTempDestination;
039 import org.apache.activemq.command.ConsumerId;
040 import org.apache.activemq.command.ConsumerInfo;
041 import org.apache.activemq.command.MessageAck;
042 import org.apache.activemq.command.MessageDispatch;
043 import org.apache.activemq.command.MessageId;
044 import org.apache.activemq.command.MessagePull;
045 import org.apache.activemq.management.JMSConsumerStatsImpl;
046 import org.apache.activemq.management.StatsCapable;
047 import org.apache.activemq.management.StatsImpl;
048 import org.apache.activemq.selector.SelectorParser;
049 import org.apache.activemq.thread.Scheduler;
050 import org.apache.activemq.transaction.Synchronization;
051 import org.apache.activemq.util.Callback;
052 import org.apache.activemq.util.IntrospectionSupport;
053 import org.apache.activemq.util.JMSExceptionSupport;
054 import org.apache.commons.logging.Log;
055 import org.apache.commons.logging.LogFactory;
056
057 /**
058 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
059 * from a destination. A <CODE> MessageConsumer</CODE> object is created by
060 * passing a <CODE>Destination</CODE> object to a message-consumer creation
061 * method supplied by a session.
062 * <P>
063 * <CODE>MessageConsumer</CODE> is the parent interface for all message
064 * consumers.
065 * <P>
066 * A message consumer can be created with a message selector. A message selector
067 * allows the client to restrict the messages delivered to the message consumer
068 * to those that match the selector.
069 * <P>
070 * A client may either synchronously receive a message consumer's messages or
071 * have the consumer asynchronously deliver them as they arrive.
072 * <P>
073 * For synchronous receipt, a client can request the next message from a message
074 * consumer using one of its <CODE> receive</CODE> methods. There are several
075 * variations of <CODE>receive</CODE> that allow a client to poll or wait for
076 * the next message.
077 * <P>
078 * For asynchronous delivery, a client can register a
079 * <CODE>MessageListener</CODE> object with a message consumer. As messages
080 * arrive at the message consumer, it delivers them by calling the
081 * <CODE>MessageListener</CODE>'s<CODE>
082 * onMessage</CODE> method.
083 * <P>
084 * It is a client programming error for a <CODE>MessageListener</CODE> to
085 * throw an exception.
086 *
087 * @version $Revision: 1.22 $
088 * @see javax.jms.MessageConsumer
089 * @see javax.jms.QueueReceiver
090 * @see javax.jms.TopicSubscriber
091 * @see javax.jms.Session
092 */
093 public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
094
095 private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
096
097 protected final ActiveMQSession session;
098 protected final ConsumerInfo info;
099
100 // These are the messages waiting to be delivered to the client
101 private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
102
103 // The are the messages that were delivered to the consumer but that have
104 // not been acknowledged. It's kept in reverse order since we
105 // Always walk list in reverse order. Only used when session is client ack.
106 private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
107 private int deliveredCounter;
108 private int additionalWindowSize;
109 private long redeliveryDelay;
110 private int ackCounter;
111 private int dispatchedCount;
112 private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
113 private JMSConsumerStatsImpl stats;
114
115 private final String selector;
116 private boolean synchronizationRegistered;
117 private AtomicBoolean started = new AtomicBoolean(false);
118
119 private MessageAvailableListener availableListener;
120
121 private RedeliveryPolicy redeliveryPolicy;
122 private boolean optimizeAcknowledge;
123 private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
124 private ExecutorService executorService;
125 private MessageTransformer transformer;
126 private boolean clearDispatchList;
127
128 private MessageAck pendingAck;
129
130 /**
131 * Create a MessageConsumer
132 *
133 * @param session
134 * @param dest
135 * @param name
136 * @param selector
137 * @param prefetch
138 * @param maximumPendingMessageCount TODO
139 * @param noLocal
140 * @param browser
141 * @param dispatchAsync
142 * @param messageListener
143 * @throws JMSException
144 */
145 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
146 String name, String selector, int prefetch,
147 int maximumPendingMessageCount, boolean noLocal, boolean browser,
148 boolean dispatchAsync, MessageListener messageListener) throws JMSException {
149 if (dest == null) {
150 throw new InvalidDestinationException("Don't understand null destinations");
151 } else if (dest.getPhysicalName() == null) {
152 throw new InvalidDestinationException("The destination object was not given a physical name.");
153 } else if (dest.isTemporary()) {
154 String physicalName = dest.getPhysicalName();
155
156 if (physicalName == null) {
157 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
158 }
159
160 String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
161
162 if (physicalName.indexOf(connectionID) < 0) {
163 throw new InvalidDestinationException(
164 "Cannot use a Temporary destination from another Connection");
165 }
166
167 if (session.connection.isDeleted(dest)) {
168 throw new InvalidDestinationException(
169 "Cannot use a Temporary destination that has been deleted");
170 }
171 if (prefetch < 0) {
172 throw new JMSException("Cannot have a prefetch size less than zero");
173 }
174 }
175
176 this.session = session;
177 this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
178 setTransformer(session.getTransformer());
179
180 this.info = new ConsumerInfo(consumerId);
181 this.info.setExclusive(this.session.connection.isExclusiveConsumer());
182 this.info.setSubscriptionName(name);
183 this.info.setPrefetchSize(prefetch);
184 this.info.setCurrentPrefetchSize(prefetch);
185 this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
186 this.info.setNoLocal(noLocal);
187 this.info.setDispatchAsync(dispatchAsync);
188 this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
189 this.info.setSelector(null);
190
191 // Allows the options on the destination to configure the consumerInfo
192 if (dest.getOptions() != null) {
193 Map<String, String> options = new HashMap<String, String>(dest.getOptions());
194 IntrospectionSupport.setProperties(this.info, options, "consumer.");
195 }
196
197 this.info.setDestination(dest);
198 this.info.setBrowser(browser);
199 if (selector != null && selector.trim().length() != 0) {
200 // Validate the selector
201 new SelectorParser().parse(selector);
202 this.info.setSelector(selector);
203 this.selector = selector;
204 } else if (info.getSelector() != null) {
205 // Validate the selector
206 new SelectorParser().parse(this.info.getSelector());
207 this.selector = this.info.getSelector();
208 } else {
209 this.selector = null;
210 }
211
212 this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
213 this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
214 && !info.isBrowser();
215 this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
216
217 if (messageListener != null) {
218 setMessageListener(messageListener);
219 }
220 try {
221 this.session.addConsumer(this);
222 this.session.syncSendPacket(info);
223 } catch (JMSException e) {
224 this.session.removeConsumer(this);
225 throw e;
226 }
227
228 if (session.connection.isStarted()) {
229 start();
230 }
231 }
232
233 public StatsImpl getStats() {
234 return stats;
235 }
236
237 public JMSConsumerStatsImpl getConsumerStats() {
238 return stats;
239 }
240
241 public RedeliveryPolicy getRedeliveryPolicy() {
242 return redeliveryPolicy;
243 }
244
245 /**
246 * Sets the redelivery policy used when messages are redelivered
247 */
248 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
249 this.redeliveryPolicy = redeliveryPolicy;
250 }
251
252 public MessageTransformer getTransformer() {
253 return transformer;
254 }
255
256 /**
257 * Sets the transformer used to transform messages before they are sent on
258 * to the JMS bus
259 */
260 public void setTransformer(MessageTransformer transformer) {
261 this.transformer = transformer;
262 }
263
264 /**
265 * @return Returns the value.
266 */
267 public ConsumerId getConsumerId() {
268 return info.getConsumerId();
269 }
270
271 /**
272 * @return the consumer name - used for durable consumers
273 */
274 public String getConsumerName() {
275 return this.info.getSubscriptionName();
276 }
277
278 /**
279 * @return true if this consumer does not accept locally produced messages
280 */
281 protected boolean isNoLocal() {
282 return info.isNoLocal();
283 }
284
285 /**
286 * Retrieve is a browser
287 *
288 * @return true if a browser
289 */
290 protected boolean isBrowser() {
291 return info.isBrowser();
292 }
293
294 /**
295 * @return ActiveMQDestination
296 */
297 protected ActiveMQDestination getDestination() {
298 return info.getDestination();
299 }
300
301 /**
302 * @return Returns the prefetchNumber.
303 */
304 public int getPrefetchNumber() {
305 return info.getPrefetchSize();
306 }
307
308 /**
309 * @return true if this is a durable topic subscriber
310 */
311 public boolean isDurableSubscriber() {
312 return info.getSubscriptionName() != null && info.getDestination().isTopic();
313 }
314
315 /**
316 * Gets this message consumer's message selector expression.
317 *
318 * @return this message consumer's message selector, or null if no message
319 * selector exists for the message consumer (that is, if the message
320 * selector was not set or was set to null or the empty string)
321 * @throws JMSException if the JMS provider fails to receive the next
322 * message due to some internal error.
323 */
324 public String getMessageSelector() throws JMSException {
325 checkClosed();
326 return selector;
327 }
328
329 /**
330 * Gets the message consumer's <CODE>MessageListener</CODE>.
331 *
332 * @return the listener for the message consumer, or null if no listener is
333 * set
334 * @throws JMSException if the JMS provider fails to get the message
335 * listener due to some internal error.
336 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
337 */
338 public MessageListener getMessageListener() throws JMSException {
339 checkClosed();
340 return this.messageListener.get();
341 }
342
343 /**
344 * Sets the message consumer's <CODE>MessageListener</CODE>.
345 * <P>
346 * Setting the message listener to null is the equivalent of unsetting the
347 * message listener for the message consumer.
348 * <P>
349 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
350 * while messages are being consumed by an existing listener or the consumer
351 * is being used to consume messages synchronously is undefined.
352 *
353 * @param listener the listener to which the messages are to be delivered
354 * @throws JMSException if the JMS provider fails to receive the next
355 * message due to some internal error.
356 * @see javax.jms.MessageConsumer#getMessageListener
357 */
358 public void setMessageListener(MessageListener listener) throws JMSException {
359 checkClosed();
360 if (info.getPrefetchSize() == 0) {
361 throw new JMSException(
362 "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
363 }
364 if (listener != null) {
365 boolean wasRunning = session.isRunning();
366 if (wasRunning) {
367 session.stop();
368 }
369
370 this.messageListener.set(listener);
371 session.redispatch(this, unconsumedMessages);
372
373 if (wasRunning) {
374 session.start();
375 }
376 } else {
377 this.messageListener.set(null);
378 }
379 }
380
381 public MessageAvailableListener getAvailableListener() {
382 return availableListener;
383 }
384
385 /**
386 * Sets the listener used to notify synchronous consumers that there is a
387 * message available so that the {@link MessageConsumer#receiveNoWait()} can
388 * be called.
389 */
390 public void setAvailableListener(MessageAvailableListener availableListener) {
391 this.availableListener = availableListener;
392 }
393
394 /**
395 * Used to get an enqueued message from the unconsumedMessages list. The
396 * amount of time this method blocks is based on the timeout value. - if
397 * timeout==-1 then it blocks until a message is received. - if timeout==0
398 * then it it tries to not block at all, it returns a message if it is
399 * available - if timeout>0 then it blocks up to timeout amount of time.
400 * Expired messages will consumed by this method.
401 *
402 * @throws JMSException
403 * @return null if we timeout or if the consumer is closed.
404 */
405 private MessageDispatch dequeue(long timeout) throws JMSException {
406 try {
407 long deadline = 0;
408 if (timeout > 0) {
409 deadline = System.currentTimeMillis() + timeout;
410 }
411 while (true) {
412 MessageDispatch md = unconsumedMessages.dequeue(timeout);
413 if (md == null) {
414 if (timeout > 0 && !unconsumedMessages.isClosed()) {
415 timeout = Math.max(deadline - System.currentTimeMillis(), 0);
416 } else {
417 return null;
418 }
419 } else if (md.getMessage() == null) {
420 return null;
421 } else if (md.getMessage().isExpired()) {
422 if (LOG.isDebugEnabled()) {
423 LOG.debug(getConsumerId() + " received expired message: " + md);
424 }
425 beforeMessageIsConsumed(md);
426 afterMessageIsConsumed(md, true);
427 if (timeout > 0) {
428 timeout = Math.max(deadline - System.currentTimeMillis(), 0);
429 }
430 } else {
431 if (LOG.isDebugEnabled()) {
432 LOG.debug(getConsumerId() + " received message: " + md);
433 }
434 return md;
435 }
436 }
437 } catch (InterruptedException e) {
438 Thread.currentThread().interrupt();
439 throw JMSExceptionSupport.create(e);
440 }
441 }
442
443 /**
444 * Receives the next message produced for this message consumer.
445 * <P>
446 * This call blocks indefinitely until a message is produced or until this
447 * message consumer is closed.
448 * <P>
449 * If this <CODE>receive</CODE> is done within a transaction, the consumer
450 * retains the message until the transaction commits.
451 *
452 * @return the next message produced for this message consumer, or null if
453 * this message consumer is concurrently closed
454 */
455 public Message receive() throws JMSException {
456 checkClosed();
457 checkMessageListener();
458
459 sendPullCommand(0);
460 MessageDispatch md = dequeue(-1);
461 if (md == null) {
462 return null;
463 }
464
465 beforeMessageIsConsumed(md);
466 afterMessageIsConsumed(md, false);
467
468 return createActiveMQMessage(md);
469 }
470
471 /**
472 * @param md
473 * @return
474 */
475 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
476 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
477 if (transformer != null) {
478 Message transformedMessage = transformer.consumerTransform(session, this, m);
479 if (transformedMessage != null) {
480 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
481 }
482 }
483 if (session.isClientAcknowledge()) {
484 m.setAcknowledgeCallback(new Callback() {
485 public void execute() throws Exception {
486 session.checkClosed();
487 session.acknowledge();
488 }
489 });
490 }else if (session.isIndividualAcknowledge()) {
491 m.setAcknowledgeCallback(new Callback() {
492 public void execute() throws Exception {
493 session.checkClosed();
494 acknowledge(md);
495 }
496 });
497 }
498 return m;
499 }
500
501 /**
502 * Receives the next message that arrives within the specified timeout
503 * interval.
504 * <P>
505 * This call blocks until a message arrives, the timeout expires, or this
506 * message consumer is closed. A <CODE>timeout</CODE> of zero never
507 * expires, and the call blocks indefinitely.
508 *
509 * @param timeout the timeout value (in milliseconds), a time out of zero
510 * never expires.
511 * @return the next message produced for this message consumer, or null if
512 * the timeout expires or this message consumer is concurrently
513 * closed
514 */
515 public Message receive(long timeout) throws JMSException {
516 checkClosed();
517 checkMessageListener();
518 if (timeout == 0) {
519 return this.receive();
520
521 }
522
523 sendPullCommand(timeout);
524 while (timeout > 0) {
525
526 MessageDispatch md;
527 if (info.getPrefetchSize() == 0) {
528 md = dequeue(-1); // We let the broker let us know when we
529 // timeout.
530 } else {
531 md = dequeue(timeout);
532 }
533
534 if (md == null) {
535 return null;
536 }
537
538 beforeMessageIsConsumed(md);
539 afterMessageIsConsumed(md, false);
540 return createActiveMQMessage(md);
541 }
542 return null;
543 }
544
545 /**
546 * Receives the next message if one is immediately available.
547 *
548 * @return the next message produced for this message consumer, or null if
549 * one is not available
550 * @throws JMSException if the JMS provider fails to receive the next
551 * message due to some internal error.
552 */
553 public Message receiveNoWait() throws JMSException {
554 checkClosed();
555 checkMessageListener();
556 sendPullCommand(-1);
557
558 MessageDispatch md;
559 if (info.getPrefetchSize() == 0) {
560 md = dequeue(-1); // We let the broker let us know when we
561 // timeout.
562 } else {
563 md = dequeue(0);
564 }
565
566 if (md == null) {
567 return null;
568 }
569
570 beforeMessageIsConsumed(md);
571 afterMessageIsConsumed(md, false);
572 return createActiveMQMessage(md);
573 }
574
575 /**
576 * Closes the message consumer.
577 * <P>
578 * Since a provider may allocate some resources on behalf of a <CODE>
579 * MessageConsumer</CODE>
580 * outside the Java virtual machine, clients should close them when they are
581 * not needed. Relying on garbage collection to eventually reclaim these
582 * resources may not be timely enough.
583 * <P>
584 * This call blocks until a <CODE>receive</CODE> or message listener in
585 * progress has completed. A blocked message consumer <CODE>receive </CODE>
586 * call returns null when this message consumer is closed.
587 *
588 * @throws JMSException if the JMS provider fails to close the consumer due
589 * to some internal error.
590 */
591 public void close() throws JMSException {
592 if (!unconsumedMessages.isClosed()) {
593 dispose();
594 this.session.asyncSendPacket(info.createRemoveCommand());
595 }
596 }
597
598 void clearMessagesInProgress() {
599 // we are called from inside the transport reconnection logic
600 // which involves us clearing all the connections' consumers
601 // dispatch lists and clearing them
602 // so rather than trying to grab a mutex (which could be already
603 // owned by the message listener calling the send) we will just set
604 // a flag so that the list can be cleared as soon as the
605 // dispatch thread is ready to flush the dispatch list
606 clearDispatchList = true;
607 }
608
609 void deliverAcks() {
610 MessageAck ack = null;
611 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
612 if (this.optimizeAcknowledge) {
613 synchronized(deliveredMessages) {
614 ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
615 if (ack != null) {
616 deliveredMessages.clear();
617 ackCounter = 0;
618 }
619 }
620 } else {
621 ack = pendingAck;
622 }
623 if (ack != null) {
624 final MessageAck ackToSend = ack;
625 if (executorService == null) {
626 executorService = Executors.newSingleThreadExecutor();
627 }
628 executorService.submit(new Runnable() {
629 public void run() {
630 try {
631 session.sendAck(ackToSend,true);
632 } catch (JMSException e) {
633 LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
634 } finally {
635 deliveryingAcknowledgements.set(false);
636 }
637 }
638 });
639 } else {
640 deliveryingAcknowledgements.set(false);
641 }
642 }
643 }
644
645 public void dispose() throws JMSException {
646 if (!unconsumedMessages.isClosed()) {
647
648 // if ( !deliveredMessages.isEmpty() ) {
649 // // We need to let the broker know how many times that message
650 // // was rolled back.
651 // rollbackCounter++;
652 // MessageDispatch lastMd = deliveredMessages.get(0);
653 // }
654
655 // Do we have any acks we need to send out before closing?
656 // Ack any delivered messages now. (session may still
657 // commit/rollback the acks).
658 // only processes optimized acknowledgements
659 deliverAcks();
660 if (executorService != null) {
661 executorService.shutdown();
662 try {
663 executorService.awaitTermination(60, TimeUnit.SECONDS);
664 } catch (InterruptedException e) {
665 Thread.currentThread().interrupt();
666 }
667 }
668 if (session.isTransacted() || session.isDupsOkAcknowledge()) {
669 acknowledge();
670 }
671 if (session.isClientAcknowledge()) {
672 if (!this.info.isBrowser()) {
673 // rollback duplicates that aren't acknowledged
674 for (MessageDispatch old : deliveredMessages) {
675 session.connection.rollbackDuplicate(this, old.getMessage());
676 }
677 }
678 }
679 synchronized(deliveredMessages) {
680 deliveredMessages.clear();
681 }
682 List<MessageDispatch> list = unconsumedMessages.removeAll();
683 if (!this.info.isBrowser()) {
684 for (MessageDispatch old : list) {
685 // ensure we don't filter this as a duplicate
686 session.connection.rollbackDuplicate(this, old.getMessage());
687 }
688 }
689 unconsumedMessages.close();
690 this.session.removeConsumer(this);
691 }
692 }
693
694 /**
695 * @throws IllegalStateException
696 */
697 protected void checkClosed() throws IllegalStateException {
698 if (unconsumedMessages.isClosed()) {
699 throw new IllegalStateException("The Consumer is closed");
700 }
701 }
702
703 /**
704 * If we have a zero prefetch specified then send a pull command to the
705 * broker to pull a message we are about to receive
706 */
707 protected void sendPullCommand(long timeout) throws JMSException {
708 if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
709 MessagePull messagePull = new MessagePull();
710 messagePull.configure(info);
711 messagePull.setTimeout(timeout);
712 session.asyncSendPacket(messagePull);
713 }
714 }
715
716 protected void checkMessageListener() throws JMSException {
717 session.checkMessageListener();
718 }
719
720 protected void setOptimizeAcknowledge(boolean value) {
721 if (optimizeAcknowledge && !value) {
722 deliverAcks();
723 }
724 optimizeAcknowledge = value;
725 }
726
727 protected void setPrefetchSize(int prefetch) {
728 deliverAcks();
729 this.info.setCurrentPrefetchSize(prefetch);
730 }
731
732 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
733 md.setDeliverySequenceId(session.getNextDeliveryId());
734 if (!session.isDupsOkAcknowledge()) {
735 synchronized(deliveredMessages) {
736 deliveredMessages.addFirst(md);
737 }
738 if (session.isTransacted()) {
739 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
740 }
741 }
742 }
743
744 private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
745 if (unconsumedMessages.isClosed()) {
746 return;
747 }
748 if (messageExpired) {
749 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
750 } else {
751 stats.onMessage();
752 if (session.isTransacted()) {
753 // Do nothing.
754 } else if (session.isAutoAcknowledge()) {
755 synchronized (deliveredMessages) {
756 if (!deliveredMessages.isEmpty()) {
757 if (optimizeAcknowledge) {
758 if (deliveryingAcknowledgements.compareAndSet(
759 false, true)) {
760 ackCounter++;
761 if (ackCounter >= (info
762 .getCurrentPrefetchSize() * .65)) {
763 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
764 if (ack != null) {
765 deliveredMessages.clear();
766 ackCounter = 0;
767 session.sendAck(ack);
768 }
769 }
770 deliveryingAcknowledgements.set(false);
771 }
772 } else {
773 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
774 if (ack!=null) {
775 deliveredMessages.clear();
776 session.sendAck(ack);
777 }
778 }
779 }
780 }
781 } else if (session.isDupsOkAcknowledge()) {
782 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
783 } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
784 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
785 }
786 else {
787 throw new IllegalStateException("Invalid session state.");
788 }
789 }
790 }
791
792 /**
793 * Creates a MessageAck for all messages contained in deliveredMessages.
794 * Caller should hold the lock for deliveredMessages.
795 *
796 * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
797 * @return <code>null</code> if nothing to ack.
798 */
799 private MessageAck makeAckForAllDeliveredMessages(byte type) {
800 synchronized (deliveredMessages) {
801 if (deliveredMessages.isEmpty())
802 return null;
803
804 MessageDispatch md = deliveredMessages.getFirst();
805 MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
806 ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
807 return ack;
808 }
809 }
810
811 private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
812
813 // Don't acknowledge now, but we may need to let the broker know the
814 // consumer got the message
815 // to expand the pre-fetch window
816 if (session.isTransacted()) {
817 session.doStartTransaction();
818 if (!synchronizationRegistered) {
819 synchronizationRegistered = true;
820 session.getTransactionContext().addSynchronization(new Synchronization() {
821 public void beforeEnd() throws Exception {
822 acknowledge();
823 synchronizationRegistered = false;
824 }
825
826 public void afterCommit() throws Exception {
827 commit();
828 synchronizationRegistered = false;
829 }
830
831 public void afterRollback() throws Exception {
832 rollback();
833 synchronizationRegistered = false;
834 }
835 });
836 }
837 }
838
839 // The delivered message list is only needed for the recover method
840 // which is only used with client ack.
841 deliveredCounter++;
842
843 MessageAck oldPendingAck = pendingAck;
844 pendingAck = new MessageAck(md, ackType, deliveredCounter);
845 if( oldPendingAck==null ) {
846 pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
847 } else {
848 pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
849 }
850 pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
851
852 if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
853 session.sendAck(pendingAck);
854 pendingAck=null;
855 additionalWindowSize = deliveredCounter;
856
857 // When using DUPS ok, we do a real ack.
858 if (ackType == MessageAck.STANDARD_ACK_TYPE) {
859 deliveredCounter = 0;
860 additionalWindowSize = 0;
861 }
862 }
863 }
864
865 /**
866 * Acknowledge all the messages that have been delivered to the client upto
867 * this point.
868 *
869 * @throws JMSException
870 */
871 public void acknowledge() throws JMSException {
872 synchronized(deliveredMessages) {
873 // Acknowledge all messages so far.
874 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
875 if (ack == null)
876 return; // no msgs
877
878 if (session.isTransacted()) {
879 session.doStartTransaction();
880 ack.setTransactionId(session.getTransactionContext().getTransactionId());
881 }
882 session.sendAck(ack);
883 pendingAck = null;
884
885 // Adjust the counters
886 deliveredCounter -= deliveredMessages.size();
887 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
888
889 if (!session.isTransacted()) {
890 deliveredMessages.clear();
891 }
892 }
893 }
894
895 void acknowledge(MessageDispatch md) throws JMSException {
896 MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
897 session.sendAck(ack);
898 synchronized(deliveredMessages){
899 deliveredMessages.remove(md);
900 }
901 }
902
903 public void commit() throws JMSException {
904 synchronized (deliveredMessages) {
905 deliveredMessages.clear();
906 }
907 redeliveryDelay = 0;
908 }
909
910 public void rollback() throws JMSException {
911 synchronized (unconsumedMessages.getMutex()) {
912 if (optimizeAcknowledge) {
913 // remove messages read but not acked at the broker yet through
914 // optimizeAcknowledge
915 if (!this.info.isBrowser()) {
916 synchronized(deliveredMessages) {
917 for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
918 // ensure we don't filter this as a duplicate
919 MessageDispatch md = deliveredMessages.removeLast();
920 session.connection.rollbackDuplicate(this, md.getMessage());
921 }
922 }
923 }
924 }
925 synchronized(deliveredMessages) {
926 if (deliveredMessages.isEmpty()) {
927 return;
928 }
929
930 // Only increase the redlivery delay after the first redelivery..
931 MessageDispatch lastMd = deliveredMessages.getFirst();
932 if (lastMd.getMessage().getRedeliveryCounter() > 0) {
933 redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
934 }
935 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
936
937 for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
938 MessageDispatch md = (MessageDispatch)iter.next();
939 md.getMessage().onMessageRolledBack();
940 }
941
942 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
943 && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
944 // We need to NACK the messages so that they get sent to the
945 // DLQ.
946 // Acknowledge the last message.
947
948 MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
949 ack.setFirstMessageId(firstMsgId);
950 session.sendAck(ack,true);
951 // ensure we don't filter this as a duplicate
952 session.connection.rollbackDuplicate(this, lastMd.getMessage());
953 // Adjust the window size.
954 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
955 redeliveryDelay = 0;
956 } else {
957
958 MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
959 ack.setFirstMessageId(firstMsgId);
960 session.sendAck(ack,true);
961
962 // stop the delivery of messages.
963 unconsumedMessages.stop();
964
965 for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
966 MessageDispatch md = (MessageDispatch)iter.next();
967 unconsumedMessages.enqueueFirst(md);
968 }
969
970 if (redeliveryDelay > 0) {
971 // Start up the delivery again a little later.
972 Scheduler.executeAfterDelay(new Runnable() {
973 public void run() {
974 try {
975 if (started.get()) {
976 start();
977 }
978 } catch (JMSException e) {
979 session.connection.onAsyncException(e);
980 }
981 }
982 }, redeliveryDelay);
983 } else {
984 start();
985 }
986
987 }
988 deliveredCounter -= deliveredMessages.size();
989 deliveredMessages.clear();
990 }
991 }
992 if (messageListener.get() != null) {
993 session.redispatch(this, unconsumedMessages);
994 }
995 }
996
997 public void dispatch(MessageDispatch md) {
998 MessageListener listener = this.messageListener.get();
999 try {
1000 synchronized (unconsumedMessages.getMutex()) {
1001 if (clearDispatchList) {
1002 // we are reconnecting so lets flush the in progress
1003 // messages
1004 clearDispatchList = false;
1005 List<MessageDispatch> list = unconsumedMessages.removeAll();
1006 if (!this.info.isBrowser()) {
1007 for (MessageDispatch old : list) {
1008 // ensure we don't filter this as a duplicate
1009 session.connection.rollbackDuplicate(this, old.getMessage());
1010 }
1011 }
1012 }
1013 if (!unconsumedMessages.isClosed()) {
1014 if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1015 if (listener != null && unconsumedMessages.isRunning()) {
1016 ActiveMQMessage message = createActiveMQMessage(md);
1017 beforeMessageIsConsumed(md);
1018 try {
1019 boolean expired = message.isExpired();
1020 if (!expired) {
1021 listener.onMessage(message);
1022 }
1023 afterMessageIsConsumed(md, expired);
1024 } catch (RuntimeException e) {
1025 if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge() || session.isIndividualAcknowledge()) {
1026 // Redeliver the message
1027 } else {
1028 // Transacted or Client ack: Deliver the
1029 // next message.
1030 afterMessageIsConsumed(md, false);
1031 }
1032 LOG.error(getConsumerId() + " Exception while processing message: " + e, e);
1033 }
1034 } else {
1035 unconsumedMessages.enqueue(md);
1036 if (availableListener != null) {
1037 availableListener.onMessageAvailable(this);
1038 }
1039 }
1040 } else {
1041 // ignore duplicate
1042 if (LOG.isDebugEnabled()) {
1043 LOG.debug(getConsumerId() + " Ignoring Duplicate: " + md.getMessage());
1044 }
1045 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
1046 }
1047 }
1048 }
1049 if (++dispatchedCount % 1000 == 0) {
1050 dispatchedCount = 0;
1051 Thread.yield();
1052 }
1053 } catch (Exception e) {
1054 session.connection.onClientInternalException(e);
1055 }
1056 }
1057
1058 public int getMessageSize() {
1059 return unconsumedMessages.size();
1060 }
1061
1062 public void start() throws JMSException {
1063 if (unconsumedMessages.isClosed()) {
1064 return;
1065 }
1066 started.set(true);
1067 unconsumedMessages.start();
1068 session.executor.wakeup();
1069 }
1070
1071 public void stop() {
1072 started.set(false);
1073 unconsumedMessages.stop();
1074 }
1075
1076 public String toString() {
1077 return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1078 + " }";
1079 }
1080
1081 /**
1082 * Delivers a message to the message listener.
1083 *
1084 * @return
1085 * @throws JMSException
1086 */
1087 public boolean iterate() {
1088 MessageListener listener = this.messageListener.get();
1089 if (listener != null) {
1090 MessageDispatch md = unconsumedMessages.dequeueNoWait();
1091 if (md != null) {
1092 try {
1093 ActiveMQMessage message = createActiveMQMessage(md);
1094 beforeMessageIsConsumed(md);
1095 listener.onMessage(message);
1096 afterMessageIsConsumed(md, false);
1097 } catch (JMSException e) {
1098 session.connection.onClientInternalException(e);
1099 }
1100 return true;
1101 }
1102 }
1103 return false;
1104 }
1105
1106 public boolean isInUse(ActiveMQTempDestination destination) {
1107 return info.getDestination().equals(destination);
1108 }
1109
1110 }