001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.Collection;
022 import java.util.Collections;
023 import java.util.Comparator;
024 import java.util.HashSet;
025 import java.util.Iterator;
026 import java.util.LinkedHashMap;
027 import java.util.LinkedHashSet;
028 import java.util.LinkedList;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.Set;
032 import java.util.concurrent.CancellationException;
033 import java.util.concurrent.ConcurrentLinkedQueue;
034 import java.util.concurrent.CountDownLatch;
035 import java.util.concurrent.DelayQueue;
036 import java.util.concurrent.Delayed;
037 import java.util.concurrent.ExecutorService;
038 import java.util.concurrent.Future;
039 import java.util.concurrent.TimeUnit;
040 import java.util.concurrent.atomic.AtomicLong;
041 import java.util.concurrent.locks.Lock;
042 import java.util.concurrent.locks.ReentrantLock;
043 import java.util.concurrent.locks.ReentrantReadWriteLock;
044
045 import javax.jms.InvalidSelectorException;
046 import javax.jms.JMSException;
047 import javax.jms.ResourceAllocationException;
048
049 import org.apache.activemq.broker.BrokerService;
050 import org.apache.activemq.broker.ConnectionContext;
051 import org.apache.activemq.broker.ProducerBrokerExchange;
052 import org.apache.activemq.broker.region.cursors.OrderedPendingList;
053 import org.apache.activemq.broker.region.cursors.PendingList;
054 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
055 import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
056 import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
057 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
058 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
059 import org.apache.activemq.broker.region.group.MessageGroupMap;
060 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
061 import org.apache.activemq.broker.region.policy.DispatchPolicy;
062 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
063 import org.apache.activemq.broker.util.InsertionCountList;
064 import org.apache.activemq.command.ActiveMQDestination;
065 import org.apache.activemq.command.ActiveMQMessage;
066 import org.apache.activemq.command.ConsumerId;
067 import org.apache.activemq.command.ExceptionResponse;
068 import org.apache.activemq.command.Message;
069 import org.apache.activemq.command.MessageAck;
070 import org.apache.activemq.command.MessageDispatchNotification;
071 import org.apache.activemq.command.MessageId;
072 import org.apache.activemq.command.ProducerAck;
073 import org.apache.activemq.command.ProducerInfo;
074 import org.apache.activemq.command.Response;
075 import org.apache.activemq.filter.BooleanExpression;
076 import org.apache.activemq.filter.MessageEvaluationContext;
077 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
078 import org.apache.activemq.selector.SelectorParser;
079 import org.apache.activemq.state.ProducerState;
080 import org.apache.activemq.store.MessageRecoveryListener;
081 import org.apache.activemq.store.MessageStore;
082 import org.apache.activemq.thread.Task;
083 import org.apache.activemq.thread.TaskRunner;
084 import org.apache.activemq.thread.TaskRunnerFactory;
085 import org.apache.activemq.transaction.Synchronization;
086 import org.apache.activemq.usage.Usage;
087 import org.apache.activemq.usage.UsageListener;
088 import org.apache.activemq.util.BrokerSupport;
089 import org.apache.activemq.util.ThreadPoolUtils;
090 import org.slf4j.Logger;
091 import org.slf4j.LoggerFactory;
092 import org.slf4j.MDC;
093
094 /**
095 * The Queue is a List of MessageEntry objects that are dispatched to matching
096 * subscriptions.
097 */
098 public class Queue extends BaseDestination implements Task, UsageListener {
099 protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
100 protected final TaskRunnerFactory taskFactory;
101 protected TaskRunner taskRunner;
102 private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
103 protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
104 private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
105 protected PendingMessageCursor messages;
106 private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
107 private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
108 // Messages that are paged in but have not yet been targeted at a
109 // subscription
110 private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
111 protected PendingList pagedInPendingDispatch = new OrderedPendingList();
112 protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
113 private MessageGroupMap messageGroupOwners;
114 private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
115 private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
116 final Lock sendLock = new ReentrantLock();
117 private ExecutorService executor;
118 private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>();
119 private boolean useConsumerPriority = true;
120 private boolean strictOrderDispatch = false;
121 private final QueueDispatchSelector dispatchSelector;
122 private boolean optimizedDispatch = false;
123 private boolean iterationRunning = false;
124 private boolean firstConsumer = false;
125 private int timeBeforeDispatchStarts = 0;
126 private int consumersBeforeDispatchStarts = 0;
127 private CountDownLatch consumersBeforeStartsLatch;
128 private final AtomicLong pendingWakeups = new AtomicLong();
129 private boolean allConsumersExclusiveByDefault = false;
130
131 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
132 public void run() {
133 asyncWakeup();
134 }
135 };
136 private final Runnable expireMessagesTask = new Runnable() {
137 public void run() {
138 expireMessages();
139 }
140 };
141
142 private final Object iteratingMutex = new Object();
143
144 class TimeoutMessage implements Delayed {
145
146 Message message;
147 ConnectionContext context;
148 long trigger;
149
150 public TimeoutMessage(Message message, ConnectionContext context, long delay) {
151 this.message = message;
152 this.context = context;
153 this.trigger = System.currentTimeMillis() + delay;
154 }
155
156 public long getDelay(TimeUnit unit) {
157 long n = trigger - System.currentTimeMillis();
158 return unit.convert(n, TimeUnit.MILLISECONDS);
159 }
160
161 public int compareTo(Delayed delayed) {
162 long other = ((TimeoutMessage) delayed).trigger;
163 int returnValue;
164 if (this.trigger < other) {
165 returnValue = -1;
166 } else if (this.trigger > other) {
167 returnValue = 1;
168 } else {
169 returnValue = 0;
170 }
171 return returnValue;
172 }
173
174 }
175
176 DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
177
178 class FlowControlTimeoutTask extends Thread {
179
180 @Override
181 public void run() {
182 TimeoutMessage timeout;
183 try {
184 while (true) {
185 timeout = flowControlTimeoutMessages.take();
186 if (timeout != null) {
187 synchronized (messagesWaitingForSpace) {
188 if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
189 ExceptionResponse response = new ExceptionResponse(
190 new ResourceAllocationException(
191 "Usage Manager Memory Limit reached. Stopping producer ("
192 + timeout.message.getProducerId()
193 + ") to prevent flooding "
194 + getActiveMQDestination().getQualifiedName()
195 + "."
196 + " See http://activemq.apache.org/producer-flow-control.html for more info"));
197 response.setCorrelationId(timeout.message.getCommandId());
198 timeout.context.getConnection().dispatchAsync(response);
199 }
200 }
201 }
202 }
203 } catch (InterruptedException e) {
204 if (LOG.isDebugEnabled()) {
205 LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping");
206 }
207 }
208 }
209 };
210
211 private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
212
213 private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
214
215 public int compare(Subscription s1, Subscription s2) {
216 // We want the list sorted in descending order
217 int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
218 if (val == 0 && messageGroupOwners != null) {
219 // then ascending order of assigned message groups to favour less loaded consumers
220 // Long.compare in jdk7
221 long x = s1.getConsumerInfo().getLastDeliveredSequenceId();
222 long y = s2.getConsumerInfo().getLastDeliveredSequenceId();
223 val = (x < y) ? -1 : ((x == y) ? 0 : 1);
224 }
225 return val;
226 }
227 };
228
229 public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
230 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
231 super(brokerService, store, destination, parentStats);
232 this.taskFactory = taskFactory;
233 this.dispatchSelector = new QueueDispatchSelector(destination);
234 }
235
236 public List<Subscription> getConsumers() {
237 consumersLock.readLock().lock();
238 try {
239 return new ArrayList<Subscription>(consumers);
240 }finally {
241 consumersLock.readLock().unlock();
242 }
243 }
244
245 // make the queue easily visible in the debugger from its task runner
246 // threads
247 final class QueueThread extends Thread {
248 final Queue queue;
249
250 public QueueThread(Runnable runnable, String name, Queue queue) {
251 super(runnable, name);
252 this.queue = queue;
253 }
254 }
255
256 class BatchMessageRecoveryListener implements MessageRecoveryListener {
257 final LinkedList<Message> toExpire = new LinkedList<Message>();
258 final double totalMessageCount;
259 int recoveredAccumulator = 0;
260 int currentBatchCount;
261
262 BatchMessageRecoveryListener(int totalMessageCount) {
263 this.totalMessageCount = totalMessageCount;
264 currentBatchCount = recoveredAccumulator;
265 }
266
267 public boolean recoverMessage(Message message) {
268 recoveredAccumulator++;
269 if (LOG.isInfoEnabled() && (recoveredAccumulator % 10000) == 0) {
270 LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + " has recovered "
271 + recoveredAccumulator + " messages. " +
272 (int) (recoveredAccumulator * 100 / totalMessageCount) + "% complete");
273 }
274 // Message could have expired while it was being
275 // loaded..
276 if (message.isExpired() && broker.isExpired(message)) {
277 toExpire.add(message);
278 return true;
279 }
280 if (hasSpace()) {
281 message.setRegionDestination(Queue.this);
282 messagesLock.writeLock().lock();
283 try {
284 try {
285 messages.addMessageLast(message);
286 } catch (Exception e) {
287 LOG.error("Failed to add message to cursor", e);
288 }
289 } finally {
290 messagesLock.writeLock().unlock();
291 }
292 destinationStatistics.getMessages().increment();
293 return true;
294 }
295 return false;
296 }
297
298 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
299 throw new RuntimeException("Should not be called.");
300 }
301
302 public boolean hasSpace() {
303 return true;
304 }
305
306 public boolean isDuplicate(MessageId id) {
307 return false;
308 }
309
310 public void reset() {
311 currentBatchCount = recoveredAccumulator;
312 }
313
314 public void processExpired() {
315 for (Message message: toExpire) {
316 messageExpired(createConnectionContext(), createMessageReference(message));
317 // drop message will decrement so counter
318 // balance here
319 destinationStatistics.getMessages().increment();
320 }
321 toExpire.clear();
322 }
323
324 public boolean done() {
325 return currentBatchCount == recoveredAccumulator;
326 }
327 }
328
329 @Override
330 public void setPrioritizedMessages(boolean prioritizedMessages) {
331 super.setPrioritizedMessages(prioritizedMessages);
332
333 if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
334 pagedInPendingDispatch = new PrioritizedPendingList();
335 redeliveredWaitingDispatch = new PrioritizedPendingList();
336 } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
337 pagedInPendingDispatch = new OrderedPendingList();
338 redeliveredWaitingDispatch = new OrderedPendingList();
339 }
340 }
341
342 @Override
343 public void initialize() throws Exception {
344
345 if (this.messages == null) {
346 if (destination.isTemporary() || broker == null || store == null) {
347 this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
348 } else {
349 this.messages = new StoreQueueCursor(broker, this);
350 }
351 }
352
353 // If a VMPendingMessageCursor don't use the default Producer System
354 // Usage
355 // since it turns into a shared blocking queue which can lead to a
356 // network deadlock.
357 // If we are cursoring to disk..it's not and issue because it does not
358 // block due
359 // to large disk sizes.
360 if (messages instanceof VMPendingMessageCursor) {
361 this.systemUsage = brokerService.getSystemUsage();
362 memoryUsage.setParent(systemUsage.getMemoryUsage());
363 }
364
365 this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
366
367 super.initialize();
368 if (store != null) {
369 // Restore the persistent messages.
370 messages.setSystemUsage(systemUsage);
371 messages.setEnableAudit(isEnableAudit());
372 messages.setMaxAuditDepth(getMaxAuditDepth());
373 messages.setMaxProducersToAudit(getMaxProducersToAudit());
374 messages.setUseCache(isUseCache());
375 messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
376 final int messageCount = store.getMessageCount();
377 if (messageCount > 0 && messages.isRecoveryRequired()) {
378 BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
379 do {
380 listener.reset();
381 store.recoverNextMessages(getMaxPageSize(), listener);
382 listener.processExpired();
383 } while (!listener.done());
384 } else {
385 destinationStatistics.getMessages().setCount(messageCount);
386 }
387 }
388 }
389
390 /*
391 * Holder for subscription that needs attention on next iterate browser
392 * needs access to existing messages in the queue that have already been
393 * dispatched
394 */
395 class BrowserDispatch {
396 QueueBrowserSubscription browser;
397
398 public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
399 browser = browserSubscription;
400 browser.incrementQueueRef();
401 }
402
403 void done() {
404 try {
405 browser.decrementQueueRef();
406 } catch (Exception e) {
407 LOG.warn("decrement ref on browser: " + browser, e);
408 }
409 }
410
411 public QueueBrowserSubscription getBrowser() {
412 return browser;
413 }
414 }
415
416 ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
417
418 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
419 if (LOG.isDebugEnabled()) {
420 LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: "
421 + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
422 + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
423 + getDestinationStatistics().getInflight().getCount());
424 }
425
426 super.addSubscription(context, sub);
427 // synchronize with dispatch method so that no new messages are sent
428 // while setting up a subscription. avoid out of order messages,
429 // duplicates, etc.
430 pagedInPendingDispatchLock.writeLock().lock();
431 try {
432
433 sub.add(context, this);
434
435 // needs to be synchronized - so no contention with dispatching
436 // consumersLock.
437 consumersLock.writeLock().lock();
438 try {
439
440 // set a flag if this is a first consumer
441 if (consumers.size() == 0) {
442 firstConsumer = true;
443 if (consumersBeforeDispatchStarts != 0) {
444 consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
445 }
446 } else {
447 if (consumersBeforeStartsLatch != null) {
448 consumersBeforeStartsLatch.countDown();
449 }
450 }
451
452 addToConsumerList(sub);
453 if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
454 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
455 if (exclusiveConsumer == null) {
456 exclusiveConsumer = sub;
457 } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE ||
458 sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
459 exclusiveConsumer = sub;
460 }
461 dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
462 }
463 }finally {
464 consumersLock.writeLock().unlock();
465 }
466
467 if (sub instanceof QueueBrowserSubscription) {
468 // tee up for dispatch in next iterate
469 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
470 BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
471 browserDispatches.add(browserDispatch);
472 }
473
474 if (!(this.optimizedDispatch || isSlave())) {
475 wakeup();
476 }
477 }finally {
478 pagedInPendingDispatchLock.writeLock().unlock();
479 }
480 if (this.optimizedDispatch || isSlave()) {
481 // Outside of dispatchLock() to maintain the lock hierarchy of
482 // iteratingMutex -> dispatchLock. - see
483 // https://issues.apache.org/activemq/browse/AMQ-1878
484 wakeup();
485 }
486 }
487
488 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
489 throws Exception {
490 super.removeSubscription(context, sub, lastDeiveredSequenceId);
491 // synchronize with dispatch method so that no new messages are sent
492 // while removing up a subscription.
493 pagedInPendingDispatchLock.writeLock().lock();
494 try {
495 if (LOG.isDebugEnabled()) {
496 LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
497 + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
498 + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
499 + getDestinationStatistics().getInflight().getCount());
500 }
501 consumersLock.writeLock().lock();
502 try {
503 removeFromConsumerList(sub);
504 if (sub.getConsumerInfo().isExclusive()) {
505 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
506 if (exclusiveConsumer == sub) {
507 exclusiveConsumer = null;
508 for (Subscription s : consumers) {
509 if (s.getConsumerInfo().isExclusive()
510 && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
511 .getConsumerInfo().getPriority())) {
512 exclusiveConsumer = s;
513
514 }
515 }
516 dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
517 }
518 } else if (isAllConsumersExclusiveByDefault()) {
519 Subscription exclusiveConsumer = null;
520 for (Subscription s : consumers) {
521 if (exclusiveConsumer == null
522 || s.getConsumerInfo().getPriority() > exclusiveConsumer
523 .getConsumerInfo().getPriority()) {
524 exclusiveConsumer = s;
525 }
526 }
527 dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
528 }
529 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
530 getMessageGroupOwners().removeConsumer(consumerId);
531
532 // redeliver inflight messages
533
534 boolean markAsRedelivered = false;
535 MessageReference lastDeliveredRef = null;
536 List<MessageReference> unAckedMessages = sub.remove(context, this);
537
538 // locate last redelivered in unconsumed list (list in delivery rather than seq order)
539 if (lastDeiveredSequenceId != 0) {
540 for (MessageReference ref : unAckedMessages) {
541 if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
542 lastDeliveredRef = ref;
543 markAsRedelivered = true;
544 if (LOG.isDebugEnabled()) {
545 LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId());
546 }
547 break;
548 }
549 }
550 }
551 for (MessageReference ref : unAckedMessages) {
552 QueueMessageReference qmr = (QueueMessageReference) ref;
553 if (qmr.getLockOwner() == sub) {
554 qmr.unlock();
555
556 // have no delivery information
557 if (lastDeiveredSequenceId == 0) {
558 qmr.incrementRedeliveryCounter();
559 } else {
560 if (markAsRedelivered) {
561 qmr.incrementRedeliveryCounter();
562 }
563 if (ref == lastDeliveredRef) {
564 // all that follow were not redelivered
565 markAsRedelivered = false;
566 }
567 }
568 }
569 redeliveredWaitingDispatch.addMessageLast(qmr);
570 }
571 if (!redeliveredWaitingDispatch.isEmpty()) {
572 doDispatch(new OrderedPendingList());
573 }
574 }finally {
575 consumersLock.writeLock().unlock();
576 }
577 if (!(this.optimizedDispatch || isSlave())) {
578 wakeup();
579 }
580 }finally {
581 pagedInPendingDispatchLock.writeLock().unlock();
582 }
583 if (this.optimizedDispatch || isSlave()) {
584 // Outside of dispatchLock() to maintain the lock hierarchy of
585 // iteratingMutex -> dispatchLock. - see
586 // https://issues.apache.org/activemq/browse/AMQ-1878
587 wakeup();
588 }
589 }
590
591 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
592 final ConnectionContext context = producerExchange.getConnectionContext();
593 // There is delay between the client sending it and it arriving at the
594 // destination.. it may have expired.
595 message.setRegionDestination(this);
596 ProducerState state = producerExchange.getProducerState();
597 if (state == null) {
598 LOG.warn("Send failed for: " + message + ", missing producer state for: " + producerExchange);
599 throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
600 }
601 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
602 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
603 && !context.isInRecoveryMode();
604 if (message.isExpired()) {
605 // message not stored - or added to stats yet - so chuck here
606 broker.getRoot().messageExpired(context, message, null);
607 if (sendProducerAck) {
608 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
609 context.getConnection().dispatchAsync(ack);
610 }
611 return;
612 }
613 if (memoryUsage.isFull()) {
614 isFull(context, memoryUsage);
615 fastProducer(context, producerInfo);
616 if (isProducerFlowControl() && context.isProducerFlowControl()) {
617 if (warnOnProducerFlowControl) {
618 warnOnProducerFlowControl = false;
619 LOG
620 .info("Usage Manager Memory Limit ("
621 + memoryUsage.getLimit()
622 + ") reached on "
623 + getActiveMQDestination().getQualifiedName()
624 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
625 + " See http://activemq.apache.org/producer-flow-control.html for more info");
626 }
627
628 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
629 throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
630 + message.getProducerId() + ") to prevent flooding "
631 + getActiveMQDestination().getQualifiedName() + "."
632 + " See http://activemq.apache.org/producer-flow-control.html for more info");
633 }
634
635 // We can avoid blocking due to low usage if the producer is
636 // sending
637 // a sync message or if it is using a producer window
638 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
639 // copy the exchange state since the context will be
640 // modified while we are waiting
641 // for space.
642 final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
643 synchronized (messagesWaitingForSpace) {
644 // Start flow control timeout task
645 // Prevent trying to start it multiple times
646 if (!flowControlTimeoutTask.isAlive()) {
647 flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task");
648 flowControlTimeoutTask.start();
649 }
650 messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
651 public void run() {
652
653 try {
654 // While waiting for space to free up... the
655 // message may have expired.
656 if (message.isExpired()) {
657 LOG.error("expired waiting for space..");
658 broker.messageExpired(context, message, null);
659 destinationStatistics.getExpired().increment();
660 } else {
661 doMessageSend(producerExchangeCopy, message);
662 }
663
664 if (sendProducerAck) {
665 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
666 .getSize());
667 context.getConnection().dispatchAsync(ack);
668 } else {
669 Response response = new Response();
670 response.setCorrelationId(message.getCommandId());
671 context.getConnection().dispatchAsync(response);
672 }
673
674 } catch (Exception e) {
675 if (!sendProducerAck && !context.isInRecoveryMode()) {
676 ExceptionResponse response = new ExceptionResponse(e);
677 response.setCorrelationId(message.getCommandId());
678 context.getConnection().dispatchAsync(response);
679 } else {
680 LOG.debug("unexpected exception on deferred send of :" + message, e);
681 }
682 }
683 }
684 });
685
686 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
687 flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
688 .getSendFailIfNoSpaceAfterTimeout()));
689 }
690
691 registerCallbackForNotFullNotification();
692 context.setDontSendReponse(true);
693 return;
694 }
695
696 } else {
697
698 if (memoryUsage.isFull()) {
699 waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
700 + message.getProducerId() + ") stopped to prevent flooding "
701 + getActiveMQDestination().getQualifiedName() + "."
702 + " See http://activemq.apache.org/producer-flow-control.html for more info");
703 }
704
705 // The usage manager could have delayed us by the time
706 // we unblock the message could have expired..
707 if (message.isExpired()) {
708 if (LOG.isDebugEnabled()) {
709 LOG.debug("Expired message: " + message);
710 }
711 broker.getRoot().messageExpired(context, message, null);
712 return;
713 }
714 }
715 }
716 }
717 doMessageSend(producerExchange, message);
718 if (sendProducerAck) {
719 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
720 context.getConnection().dispatchAsync(ack);
721 }
722 }
723
724 private void registerCallbackForNotFullNotification() {
725 // If the usage manager is not full, then the task will not
726 // get called..
727 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
728 // so call it directly here.
729 sendMessagesWaitingForSpaceTask.run();
730 }
731 }
732
733 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
734 Exception {
735 final ConnectionContext context = producerExchange.getConnectionContext();
736 Future<Object> result = null;
737
738 checkUsage(context, message);
739 sendLock.lockInterruptibly();
740 try {
741 if (store != null && message.isPersistent()) {
742 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
743 if (messages.isCacheEnabled()) {
744 result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
745 } else {
746 store.addMessage(context, message);
747 }
748 if (isReduceMemoryFootprint()) {
749 message.clearMarshalledState();
750 }
751 }
752 if (context.isInTransaction()) {
753 // If this is a transacted message.. increase the usage now so that
754 // a big TX does not blow up
755 // our memory. This increment is decremented once the tx finishes..
756 message.incrementReferenceCount();
757
758 context.getTransaction().addSynchronization(new Synchronization() {
759 @Override
760 public void afterCommit() throws Exception {
761 sendLock.lockInterruptibly();
762 try {
763 // It could take while before we receive the commit
764 // op, by that time the message could have expired..
765 if (broker.isExpired(message)) {
766 broker.messageExpired(context, message, null);
767 destinationStatistics.getExpired().increment();
768 return;
769 }
770 sendMessage(message);
771 } finally {
772 sendLock.unlock();
773 message.decrementReferenceCount();
774 }
775 messageSent(context, message);
776 }
777 @Override
778 public void afterRollback() throws Exception {
779 message.decrementReferenceCount();
780 }
781 });
782 } else {
783 // Add to the pending list, this takes care of incrementing the
784 // usage manager.
785 sendMessage(message);
786 }
787 } finally {
788 sendLock.unlock();
789 }
790 if (!context.isInTransaction()) {
791 messageSent(context, message);
792 }
793 if (result != null && !result.isCancelled()) {
794 try {
795 result.get();
796 } catch (CancellationException e) {
797 // ignore - the task has been cancelled if the message
798 // has already been deleted
799 }
800 }
801 }
802
803 private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
804 if (message.isPersistent()) {
805 if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
806 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
807 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
808 + message.getProducerId() + ") to prevent flooding "
809 + getActiveMQDestination().getQualifiedName() + "."
810 + " See http://activemq.apache.org/producer-flow-control.html for more info";
811
812 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
813 }
814 } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
815 final String logMessage = "Temp Store is Full ("
816 + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit()
817 +"). Stopping producer (" + message.getProducerId()
818 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
819 + " See http://activemq.apache.org/producer-flow-control.html for more info";
820
821 waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
822 }
823 }
824
825 private void expireMessages() {
826 if (LOG.isDebugEnabled()) {
827 LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages ..");
828 }
829
830 // just track the insertion count
831 List<Message> browsedMessages = new InsertionCountList<Message>();
832 doBrowse(browsedMessages, this.getMaxExpirePageSize());
833 asyncWakeup();
834 if (LOG.isDebugEnabled()) {
835 LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages done.");
836 }
837 }
838
839 public void gc() {
840 }
841
842 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
843 throws IOException {
844 messageConsumed(context, node);
845 if (store != null && node.isPersistent()) {
846 store.removeAsyncMessage(context, convertToNonRangedAck(ack, node));
847 }
848 }
849
850 Message loadMessage(MessageId messageId) throws IOException {
851 Message msg = null;
852 if (store != null) { // can be null for a temp q
853 msg = store.getMessage(messageId);
854 if (msg != null) {
855 msg.setRegionDestination(this);
856 }
857 }
858 return msg;
859 }
860
861 @Override
862 public String toString() {
863 int size = 0;
864 messagesLock.readLock().lock();
865 try{
866 size = messages.size();
867 }finally {
868 messagesLock.readLock().unlock();
869 }
870 return destination.getQualifiedName() + ", subscriptions=" + consumers.size()
871 + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
872 + messageGroupOwners;
873 }
874
875 public void start() throws Exception {
876 if (memoryUsage != null) {
877 memoryUsage.start();
878 }
879 if (systemUsage.getStoreUsage() != null) {
880 systemUsage.getStoreUsage().start();
881 }
882 systemUsage.getMemoryUsage().addUsageListener(this);
883 messages.start();
884 if (getExpireMessagesPeriod() > 0) {
885 scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
886 }
887 doPageIn(false);
888 }
889
890 public void stop() throws Exception {
891 if (taskRunner != null) {
892 taskRunner.shutdown();
893 }
894 if (this.executor != null) {
895 ThreadPoolUtils.shutdownNow(executor);
896 executor = null;
897 }
898
899 scheduler.cancel(expireMessagesTask);
900
901 if (flowControlTimeoutTask.isAlive()) {
902 flowControlTimeoutTask.interrupt();
903 }
904
905 if (messages != null) {
906 messages.stop();
907 }
908
909 systemUsage.getMemoryUsage().removeUsageListener(this);
910 if (memoryUsage != null) {
911 memoryUsage.stop();
912 }
913 if (store != null) {
914 store.stop();
915 }
916 }
917
918 // Properties
919 // -------------------------------------------------------------------------
920 @Override
921 public ActiveMQDestination getActiveMQDestination() {
922 return destination;
923 }
924
925 public MessageGroupMap getMessageGroupOwners() {
926 if (messageGroupOwners == null) {
927 messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
928 }
929 return messageGroupOwners;
930 }
931
932 public DispatchPolicy getDispatchPolicy() {
933 return dispatchPolicy;
934 }
935
936 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
937 this.dispatchPolicy = dispatchPolicy;
938 }
939
940 public MessageGroupMapFactory getMessageGroupMapFactory() {
941 return messageGroupMapFactory;
942 }
943
944 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
945 this.messageGroupMapFactory = messageGroupMapFactory;
946 }
947
948 public PendingMessageCursor getMessages() {
949 return this.messages;
950 }
951
952 public void setMessages(PendingMessageCursor messages) {
953 this.messages = messages;
954 }
955
956 public boolean isUseConsumerPriority() {
957 return useConsumerPriority;
958 }
959
960 public void setUseConsumerPriority(boolean useConsumerPriority) {
961 this.useConsumerPriority = useConsumerPriority;
962 }
963
964 public boolean isStrictOrderDispatch() {
965 return strictOrderDispatch;
966 }
967
968 public void setStrictOrderDispatch(boolean strictOrderDispatch) {
969 this.strictOrderDispatch = strictOrderDispatch;
970 }
971
972 public boolean isOptimizedDispatch() {
973 return optimizedDispatch;
974 }
975
976 public void setOptimizedDispatch(boolean optimizedDispatch) {
977 this.optimizedDispatch = optimizedDispatch;
978 }
979
980 public int getTimeBeforeDispatchStarts() {
981 return timeBeforeDispatchStarts;
982 }
983
984 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
985 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
986 }
987
988 public int getConsumersBeforeDispatchStarts() {
989 return consumersBeforeDispatchStarts;
990 }
991
992 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
993 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
994 }
995
996 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
997 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
998 }
999
1000 public boolean isAllConsumersExclusiveByDefault() {
1001 return allConsumersExclusiveByDefault;
1002 }
1003
1004
1005 // Implementation methods
1006 // -------------------------------------------------------------------------
1007 private QueueMessageReference createMessageReference(Message message) {
1008 QueueMessageReference result = new IndirectMessageReference(message);
1009 return result;
1010 }
1011
1012 public Message[] browse() {
1013 List<Message> browseList = new ArrayList<Message>();
1014 doBrowse(browseList, getMaxBrowsePageSize());
1015 return browseList.toArray(new Message[browseList.size()]);
1016 }
1017
1018 public void doBrowse(List<Message> browseList, int max) {
1019 final ConnectionContext connectionContext = createConnectionContext();
1020 try {
1021 pageInMessages(!isUseCache());
1022 List<MessageReference> toExpire = new ArrayList<MessageReference>();
1023
1024 pagedInPendingDispatchLock.writeLock().lock();
1025 try {
1026 addAll(pagedInPendingDispatch.values(), browseList, max, toExpire);
1027 for (MessageReference ref : toExpire) {
1028 pagedInPendingDispatch.remove(ref);
1029 if (broker.isExpired(ref)) {
1030 if (LOG.isDebugEnabled()) {
1031 LOG.debug("expiring from pagedInPending: " + ref);
1032 }
1033 messageExpired(connectionContext, ref);
1034 }
1035 }
1036 } finally {
1037 pagedInPendingDispatchLock.writeLock().unlock();
1038 }
1039 toExpire.clear();
1040 pagedInMessagesLock.readLock().lock();
1041 try {
1042 addAll(pagedInMessages.values(), browseList, max, toExpire);
1043 } finally {
1044 pagedInMessagesLock.readLock().unlock();
1045 }
1046 for (MessageReference ref : toExpire) {
1047 if (broker.isExpired(ref)) {
1048 if (LOG.isDebugEnabled()) {
1049 LOG.debug("expiring from pagedInMessages: " + ref);
1050 }
1051 messageExpired(connectionContext, ref);
1052 } else {
1053 pagedInMessagesLock.writeLock().lock();
1054 try {
1055 pagedInMessages.remove(ref.getMessageId());
1056 } finally {
1057 pagedInMessagesLock.writeLock().unlock();
1058 }
1059 }
1060 }
1061
1062 if (browseList.size() < getMaxBrowsePageSize()) {
1063 messagesLock.writeLock().lock();
1064 try {
1065 try {
1066 messages.reset();
1067 while (messages.hasNext() && browseList.size() < max) {
1068 MessageReference node = messages.next();
1069 if (node.isExpired()) {
1070 if (broker.isExpired(node)) {
1071 if (LOG.isDebugEnabled()) {
1072 LOG.debug("expiring from messages: " + node);
1073 }
1074 messageExpired(connectionContext, createMessageReference(node.getMessage()));
1075 }
1076 messages.remove();
1077 } else {
1078 messages.rollback(node.getMessageId());
1079 if (browseList.contains(node.getMessage()) == false) {
1080 browseList.add(node.getMessage());
1081 }
1082 }
1083 node.decrementReferenceCount();
1084 }
1085 } finally {
1086 messages.release();
1087 }
1088 } finally {
1089 messagesLock.writeLock().unlock();
1090 }
1091 }
1092
1093 } catch (Exception e) {
1094 LOG.error("Problem retrieving message for browse", e);
1095 }
1096 }
1097
1098 private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int maxBrowsePageSize,
1099 List<MessageReference> toExpire) throws Exception {
1100 for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
1101 QueueMessageReference ref = (QueueMessageReference) i.next();
1102 if (ref.isExpired()) {
1103 toExpire.add(ref);
1104 } else if (l.contains(ref.getMessage()) == false) {
1105 l.add(ref.getMessage());
1106 }
1107 }
1108 }
1109
1110 public QueueMessageReference getMessage(String id) {
1111 MessageId msgId = new MessageId(id);
1112 pagedInMessagesLock.readLock().lock();
1113 try{
1114 QueueMessageReference ref = this.pagedInMessages.get(msgId);
1115 if (ref != null) {
1116 return ref;
1117 }
1118 }finally {
1119 pagedInMessagesLock.readLock().unlock();
1120 }
1121 messagesLock.readLock().lock();
1122 try{
1123 try {
1124 messages.reset();
1125 while (messages.hasNext()) {
1126 MessageReference mr = messages.next();
1127 QueueMessageReference qmr = createMessageReference(mr.getMessage());
1128 qmr.decrementReferenceCount();
1129 messages.rollback(qmr.getMessageId());
1130 if (msgId.equals(qmr.getMessageId())) {
1131 return qmr;
1132 }
1133 }
1134 } finally {
1135 messages.release();
1136 }
1137 }finally {
1138 messagesLock.readLock().unlock();
1139 }
1140 return null;
1141 }
1142
1143 public void purge() throws Exception {
1144 ConnectionContext c = createConnectionContext();
1145 List<MessageReference> list = null;
1146 do {
1147 doPageIn(true);
1148 pagedInMessagesLock.readLock().lock();
1149 try {
1150 list = new ArrayList<MessageReference>(pagedInMessages.values());
1151 }finally {
1152 pagedInMessagesLock.readLock().unlock();
1153 }
1154
1155 for (MessageReference ref : list) {
1156 try {
1157 QueueMessageReference r = (QueueMessageReference) ref;
1158 removeMessage(c, r);
1159 } catch (IOException e) {
1160 }
1161 }
1162 // don't spin/hang if stats are out and there is nothing left in the
1163 // store
1164 } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
1165 if (this.destinationStatistics.getMessages().getCount() > 0) {
1166 LOG.warn(getActiveMQDestination().getQualifiedName()
1167 + " after purge complete, message count stats report: "
1168 + this.destinationStatistics.getMessages().getCount());
1169 }
1170 gc();
1171 this.destinationStatistics.getMessages().setCount(0);
1172 getMessages().clear();
1173 }
1174
1175 public void clearPendingMessages() {
1176 messagesLock.writeLock().lock();
1177 try {
1178 if (store != null) {
1179 store.resetBatching();
1180 }
1181 messages.gc();
1182 messages.reset();
1183 asyncWakeup();
1184 } finally {
1185 messagesLock.writeLock().unlock();
1186 }
1187 }
1188
1189 /**
1190 * Removes the message matching the given messageId
1191 */
1192 public boolean removeMessage(String messageId) throws Exception {
1193 return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
1194 }
1195
1196 /**
1197 * Removes the messages matching the given selector
1198 *
1199 * @return the number of messages removed
1200 */
1201 public int removeMatchingMessages(String selector) throws Exception {
1202 return removeMatchingMessages(selector, -1);
1203 }
1204
1205 /**
1206 * Removes the messages matching the given selector up to the maximum number
1207 * of matched messages
1208 *
1209 * @return the number of messages removed
1210 */
1211 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
1212 return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
1213 }
1214
1215 /**
1216 * Removes the messages matching the given filter up to the maximum number
1217 * of matched messages
1218 *
1219 * @return the number of messages removed
1220 */
1221 public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
1222 int movedCounter = 0;
1223 Set<MessageReference> set = new LinkedHashSet<MessageReference>();
1224 ConnectionContext context = createConnectionContext();
1225 do {
1226 doPageIn(true);
1227 pagedInMessagesLock.readLock().lock();
1228 try{
1229 set.addAll(pagedInMessages.values());
1230 }finally {
1231 pagedInMessagesLock.readLock().unlock();
1232 }
1233 List<MessageReference> list = new ArrayList<MessageReference>(set);
1234 for (MessageReference ref : list) {
1235 IndirectMessageReference r = (IndirectMessageReference) ref;
1236 if (filter.evaluate(context, r)) {
1237
1238 removeMessage(context, r);
1239 set.remove(r);
1240 if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1241 return movedCounter;
1242 }
1243 }
1244 }
1245 } while (set.size() < this.destinationStatistics.getMessages().getCount());
1246 return movedCounter;
1247 }
1248
1249 /**
1250 * Copies the message matching the given messageId
1251 */
1252 public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1253 throws Exception {
1254 return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
1255 }
1256
1257 /**
1258 * Copies the messages matching the given selector
1259 *
1260 * @return the number of messages copied
1261 */
1262 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1263 throws Exception {
1264 return copyMatchingMessagesTo(context, selector, dest, -1);
1265 }
1266
1267 /**
1268 * Copies the messages matching the given selector up to the maximum number
1269 * of matched messages
1270 *
1271 * @return the number of messages copied
1272 */
1273 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1274 int maximumMessages) throws Exception {
1275 return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
1276 }
1277
1278 /**
1279 * Copies the messages matching the given filter up to the maximum number of
1280 * matched messages
1281 *
1282 * @return the number of messages copied
1283 */
1284 public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
1285 int maximumMessages) throws Exception {
1286 int movedCounter = 0;
1287 int count = 0;
1288 Set<MessageReference> set = new LinkedHashSet<MessageReference>();
1289 do {
1290 int oldMaxSize = getMaxPageSize();
1291 setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
1292 doPageIn(true);
1293 setMaxPageSize(oldMaxSize);
1294 pagedInMessagesLock.readLock().lock();
1295 try {
1296 set.addAll(pagedInMessages.values());
1297 }finally {
1298 pagedInMessagesLock.readLock().unlock();
1299 }
1300 List<MessageReference> list = new ArrayList<MessageReference>(set);
1301 for (MessageReference ref : list) {
1302 IndirectMessageReference r = (IndirectMessageReference) ref;
1303 if (filter.evaluate(context, r)) {
1304
1305 r.incrementReferenceCount();
1306 try {
1307 Message m = r.getMessage();
1308 BrokerSupport.resend(context, m, dest);
1309 if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1310 return movedCounter;
1311 }
1312 } finally {
1313 r.decrementReferenceCount();
1314 }
1315 }
1316 count++;
1317 }
1318 } while (count < this.destinationStatistics.getMessages().getCount());
1319 return movedCounter;
1320 }
1321
1322 /**
1323 * Move a message
1324 *
1325 * @param context
1326 * connection context
1327 * @param m
1328 * QueueMessageReference
1329 * @param dest
1330 * ActiveMQDestination
1331 * @throws Exception
1332 */
1333 public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
1334 BrokerSupport.resend(context, m.getMessage(), dest);
1335 removeMessage(context, m);
1336 messagesLock.writeLock().lock();
1337 try{
1338 messages.rollback(m.getMessageId());
1339 }finally {
1340 messagesLock.writeLock().unlock();
1341 }
1342 return true;
1343 }
1344
1345 /**
1346 * Moves the message matching the given messageId
1347 */
1348 public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1349 throws Exception {
1350 return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
1351 }
1352
1353 /**
1354 * Moves the messages matching the given selector
1355 *
1356 * @return the number of messages removed
1357 */
1358 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1359 throws Exception {
1360 return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
1361 }
1362
1363 /**
1364 * Moves the messages matching the given selector up to the maximum number
1365 * of matched messages
1366 */
1367 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1368 int maximumMessages) throws Exception {
1369 return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
1370 }
1371
1372 /**
1373 * Moves the messages matching the given filter up to the maximum number of
1374 * matched messages
1375 */
1376 public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
1377 ActiveMQDestination dest, int maximumMessages) throws Exception {
1378 int movedCounter = 0;
1379 Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
1380 do {
1381 doPageIn(true);
1382 pagedInMessagesLock.readLock().lock();
1383 try{
1384 set.addAll(pagedInMessages.values());
1385 }finally {
1386 pagedInMessagesLock.readLock().unlock();
1387 }
1388 List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
1389 for (QueueMessageReference ref : list) {
1390 if (filter.evaluate(context, ref)) {
1391 // We should only move messages that can be locked.
1392 moveMessageTo(context, ref, dest);
1393 set.remove(ref);
1394 if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1395 return movedCounter;
1396 }
1397 }
1398 }
1399 } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
1400 return movedCounter;
1401 }
1402
1403 /**
1404 * @return true if we would like to iterate again
1405 * @see org.apache.activemq.thread.Task#iterate()
1406 */
1407 public boolean iterate() {
1408 MDC.put("activemq.destination", getName());
1409 boolean pageInMoreMessages = false;
1410 synchronized (iteratingMutex) {
1411
1412 // If optimize dispatch is on or this is a slave this method could be called recursively
1413 // we set this state value to short-circuit wakeup in those cases to avoid that as it
1414 // could lead to errors.
1415 iterationRunning = true;
1416
1417 // do early to allow dispatch of these waiting messages
1418 synchronized (messagesWaitingForSpace) {
1419 Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
1420 while (it.hasNext()) {
1421 if (!memoryUsage.isFull()) {
1422 Runnable op = it.next();
1423 it.remove();
1424 op.run();
1425 } else {
1426 registerCallbackForNotFullNotification();
1427 break;
1428 }
1429 }
1430 }
1431
1432 if (firstConsumer) {
1433 firstConsumer = false;
1434 try {
1435 if (consumersBeforeDispatchStarts > 0) {
1436 int timeout = 1000; // wait one second by default if
1437 // consumer count isn't reached
1438 if (timeBeforeDispatchStarts > 0) {
1439 timeout = timeBeforeDispatchStarts;
1440 }
1441 if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
1442 if (LOG.isDebugEnabled()) {
1443 LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
1444 }
1445 } else {
1446 if (LOG.isDebugEnabled()) {
1447 LOG.debug(timeout + " ms elapsed and " + consumers.size()
1448 + " consumers subscribed. Starting dispatch.");
1449 }
1450 }
1451 }
1452 if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
1453 iteratingMutex.wait(timeBeforeDispatchStarts);
1454 if (LOG.isDebugEnabled()) {
1455 LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
1456 }
1457 }
1458 } catch (Exception e) {
1459 LOG.error(e.toString());
1460 }
1461 }
1462
1463 BrowserDispatch pendingBrowserDispatch = browserDispatches.poll();
1464
1465 messagesLock.readLock().lock();
1466 try{
1467 pageInMoreMessages |= !messages.isEmpty();
1468 } finally {
1469 messagesLock.readLock().unlock();
1470 }
1471
1472 pagedInPendingDispatchLock.readLock().lock();
1473 try {
1474 pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
1475 } finally {
1476 pagedInPendingDispatchLock.readLock().unlock();
1477 }
1478
1479 // Perhaps we should page always into the pagedInPendingDispatch
1480 // list if
1481 // !messages.isEmpty(), and then if
1482 // !pagedInPendingDispatch.isEmpty()
1483 // then we do a dispatch.
1484 if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
1485 try {
1486 pageInMessages(pendingBrowserDispatch != null);
1487
1488 } catch (Throwable e) {
1489 LOG.error("Failed to page in more queue messages ", e);
1490 }
1491 }
1492
1493 if (pendingBrowserDispatch != null) {
1494 ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
1495 pagedInMessagesLock.readLock().lock();
1496 try{
1497 alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
1498 }finally {
1499 pagedInMessagesLock.readLock().unlock();
1500 }
1501 if (LOG.isDebugEnabled()) {
1502 LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
1503 + ", already dispatched/paged count: " + alreadyDispatchedMessages.size());
1504 }
1505 do {
1506 try {
1507 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
1508 msgContext.setDestination(destination);
1509
1510 QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
1511 for (QueueMessageReference node : alreadyDispatchedMessages) {
1512 if (!node.isAcked()) {
1513 msgContext.setMessageReference(node);
1514 if (browser.matches(node, msgContext)) {
1515 browser.add(node);
1516 }
1517 }
1518 }
1519 pendingBrowserDispatch.done();
1520 } catch (Exception e) {
1521 LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
1522 }
1523
1524 } while ((pendingBrowserDispatch = browserDispatches.poll()) != null);
1525 }
1526
1527 if (pendingWakeups.get() > 0) {
1528 pendingWakeups.decrementAndGet();
1529 }
1530 MDC.remove("activemq.destination");
1531 iterationRunning = false;
1532
1533 return pendingWakeups.get() > 0;
1534 }
1535 }
1536
1537 protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
1538 return new MessageReferenceFilter() {
1539 public boolean evaluate(ConnectionContext context, MessageReference r) {
1540 return messageId.equals(r.getMessageId().toString());
1541 }
1542
1543 @Override
1544 public String toString() {
1545 return "MessageIdFilter: " + messageId;
1546 }
1547 };
1548 }
1549
1550 protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException {
1551
1552 if (selector == null || selector.isEmpty()) {
1553 return new MessageReferenceFilter() {
1554
1555 @Override
1556 public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException {
1557 return true;
1558 }
1559 };
1560 }
1561
1562 final BooleanExpression selectorExpression = SelectorParser.parse(selector);
1563
1564 return new MessageReferenceFilter() {
1565 public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException {
1566 MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
1567
1568 messageEvaluationContext.setMessageReference(r);
1569 if (messageEvaluationContext.getDestination() == null) {
1570 messageEvaluationContext.setDestination(getActiveMQDestination());
1571 }
1572
1573 return selectorExpression.matches(messageEvaluationContext);
1574 }
1575 };
1576 }
1577
1578 protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
1579 removeMessage(c, null, r);
1580 pagedInPendingDispatchLock.writeLock().lock();
1581 try {
1582 pagedInPendingDispatch.remove(r);
1583 } finally {
1584 pagedInPendingDispatchLock.writeLock().unlock();
1585 }
1586
1587 }
1588
1589 protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
1590 MessageAck ack = new MessageAck();
1591 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
1592 ack.setDestination(destination);
1593 ack.setMessageID(r.getMessageId());
1594 removeMessage(c, subs, r, ack);
1595 }
1596
1597 protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
1598 MessageAck ack) throws IOException {
1599 reference.setAcked(true);
1600 // This sends the ack the the journal..
1601 if (!ack.isInTransaction()) {
1602 acknowledge(context, sub, ack, reference);
1603 getDestinationStatistics().getDequeues().increment();
1604 dropMessage(reference);
1605 } else {
1606 try {
1607 acknowledge(context, sub, ack, reference);
1608 } finally {
1609 context.getTransaction().addSynchronization(new Synchronization() {
1610
1611 @Override
1612 public void afterCommit() throws Exception {
1613 getDestinationStatistics().getDequeues().increment();
1614 dropMessage(reference);
1615 wakeup();
1616 }
1617
1618 @Override
1619 public void afterRollback() throws Exception {
1620 reference.setAcked(false);
1621 wakeup();
1622 }
1623 });
1624 }
1625 }
1626 if (ack.isPoisonAck()) {
1627 // message gone to DLQ, is ok to allow redelivery
1628 messagesLock.writeLock().lock();
1629 try{
1630 messages.rollback(reference.getMessageId());
1631 }finally {
1632 messagesLock.writeLock().unlock();
1633 }
1634 }
1635
1636 }
1637
1638 private void dropMessage(QueueMessageReference reference) {
1639 reference.drop();
1640 destinationStatistics.getMessages().decrement();
1641 pagedInMessagesLock.writeLock().lock();
1642 try{
1643 pagedInMessages.remove(reference.getMessageId());
1644 }finally {
1645 pagedInMessagesLock.writeLock().unlock();
1646 }
1647 }
1648
1649 public void messageExpired(ConnectionContext context, MessageReference reference) {
1650 messageExpired(context, null, reference);
1651 }
1652
1653 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
1654 if (LOG.isDebugEnabled()) {
1655 LOG.debug("message expired: " + reference);
1656 }
1657 broker.messageExpired(context, reference, subs);
1658 destinationStatistics.getExpired().increment();
1659 try {
1660 removeMessage(context, subs, (QueueMessageReference) reference);
1661 } catch (IOException e) {
1662 LOG.error("Failed to remove expired Message from the store ", e);
1663 }
1664 }
1665
1666 final void sendMessage(final Message msg) throws Exception {
1667 messagesLock.writeLock().lock();
1668 try{
1669 messages.addMessageLast(msg);
1670 }finally {
1671 messagesLock.writeLock().unlock();
1672 }
1673 }
1674
1675 final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
1676 destinationStatistics.getEnqueues().increment();
1677 destinationStatistics.getMessages().increment();
1678 messageDelivered(context, msg);
1679 consumersLock.readLock().lock();
1680 try {
1681 if (consumers.isEmpty()) {
1682 onMessageWithNoConsumers(context, msg);
1683 }
1684 }finally {
1685 consumersLock.readLock().unlock();
1686 }
1687 if (LOG.isDebugEnabled()) {
1688 LOG.debug(broker.getBrokerName() + " Message " + msg.getMessageId() + " sent to " + this.destination);
1689 }
1690 wakeup();
1691 }
1692
1693 public void wakeup() {
1694 if ((optimizedDispatch || isSlave()) && !iterationRunning) {
1695 iterate();
1696 pendingWakeups.incrementAndGet();
1697 } else {
1698 asyncWakeup();
1699 }
1700 }
1701
1702 private void asyncWakeup() {
1703 try {
1704 pendingWakeups.incrementAndGet();
1705 this.taskRunner.wakeup();
1706 } catch (InterruptedException e) {
1707 LOG.warn("Async task runner failed to wakeup ", e);
1708 }
1709 }
1710
1711 private boolean isSlave() {
1712 return broker.getBrokerService().isSlave();
1713 }
1714
1715 private void doPageIn(boolean force) throws Exception {
1716 PendingList newlyPaged = doPageInForDispatch(force);
1717 pagedInPendingDispatchLock.writeLock().lock();
1718 try {
1719 if (pagedInPendingDispatch.isEmpty()) {
1720 pagedInPendingDispatch.addAll(newlyPaged);
1721
1722 } else {
1723 for (MessageReference qmr : newlyPaged) {
1724 if (!pagedInPendingDispatch.contains(qmr)) {
1725 pagedInPendingDispatch.addMessageLast(qmr);
1726 }
1727 }
1728 }
1729 } finally {
1730 pagedInPendingDispatchLock.writeLock().unlock();
1731 }
1732 }
1733
1734 private PendingList doPageInForDispatch(boolean force) throws Exception {
1735 List<QueueMessageReference> result = null;
1736 PendingList resultList = null;
1737
1738 int toPageIn = Math.min(getMaxPageSize(), messages.size());
1739 if (LOG.isDebugEnabled()) {
1740 LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
1741 + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
1742 + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount()
1743 + ", dequeueCount: " + destinationStatistics.getDequeues().getCount());
1744 }
1745
1746 if (isLazyDispatch() && !force) {
1747 // Only page in the minimum number of messages which can be
1748 // dispatched immediately.
1749 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
1750 }
1751 int pagedInPendingSize = 0;
1752 pagedInPendingDispatchLock.readLock().lock();
1753 try {
1754 pagedInPendingSize = pagedInPendingDispatch.size();
1755 } finally {
1756 pagedInPendingDispatchLock.readLock().unlock();
1757 }
1758 if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
1759 int count = 0;
1760 result = new ArrayList<QueueMessageReference>(toPageIn);
1761 messagesLock.writeLock().lock();
1762 try {
1763 try {
1764 messages.setMaxBatchSize(toPageIn);
1765 messages.reset();
1766 while (messages.hasNext() && count < toPageIn) {
1767 MessageReference node = messages.next();
1768 messages.remove();
1769
1770 QueueMessageReference ref = createMessageReference(node.getMessage());
1771 if (ref.isExpired()) {
1772 if (broker.isExpired(ref)) {
1773 messageExpired(createConnectionContext(), ref);
1774 } else {
1775 ref.decrementReferenceCount();
1776 }
1777 } else {
1778 result.add(ref);
1779 count++;
1780 }
1781 }
1782 } finally {
1783 messages.release();
1784 }
1785 } finally {
1786 messagesLock.writeLock().unlock();
1787 }
1788 // Only add new messages, not already pagedIn to avoid multiple
1789 // dispatch attempts
1790 pagedInMessagesLock.writeLock().lock();
1791 try {
1792 if(isPrioritizedMessages()) {
1793 resultList = new PrioritizedPendingList();
1794 } else {
1795 resultList = new OrderedPendingList();
1796 }
1797 for (QueueMessageReference ref : result) {
1798 if (!pagedInMessages.containsKey(ref.getMessageId())) {
1799 pagedInMessages.put(ref.getMessageId(), ref);
1800 resultList.addMessageLast(ref);
1801 } else {
1802 ref.decrementReferenceCount();
1803 }
1804 }
1805 } finally {
1806 pagedInMessagesLock.writeLock().unlock();
1807 }
1808 } else {
1809 // Avoid return null list, if condition is not validated
1810 resultList = new OrderedPendingList();
1811 }
1812
1813 return resultList;
1814 }
1815
1816 private void doDispatch(PendingList list) throws Exception {
1817 boolean doWakeUp = false;
1818
1819 pagedInPendingDispatchLock.writeLock().lock();
1820 try {
1821 if (!redeliveredWaitingDispatch.isEmpty()) {
1822 // Try first to dispatch redelivered messages to keep an
1823 // proper order
1824 redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
1825 }
1826 if (!pagedInPendingDispatch.isEmpty()) {
1827 // Next dispatch anything that had not been
1828 // dispatched before.
1829 pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
1830 }
1831 // and now see if we can dispatch the new stuff.. and append to
1832 // the pending
1833 // list anything that does not actually get dispatched.
1834 if (list != null && !list.isEmpty()) {
1835 if (pagedInPendingDispatch.isEmpty()) {
1836 pagedInPendingDispatch.addAll(doActualDispatch(list));
1837 } else {
1838 for (MessageReference qmr : list) {
1839 if (!pagedInPendingDispatch.contains(qmr)) {
1840 pagedInPendingDispatch.addMessageLast(qmr);
1841 }
1842 }
1843 doWakeUp = true;
1844 }
1845 }
1846 } finally {
1847 pagedInPendingDispatchLock.writeLock().unlock();
1848 }
1849
1850 if (doWakeUp) {
1851 // avoid lock order contention
1852 asyncWakeup();
1853 }
1854 }
1855
1856 /**
1857 * @return list of messages that could get dispatched to consumers if they
1858 * were not full.
1859 */
1860 private PendingList doActualDispatch(PendingList list) throws Exception {
1861 List<Subscription> consumers;
1862 consumersLock.writeLock().lock();
1863
1864 try {
1865 if (this.consumers.isEmpty() || isSlave()) {
1866 // slave dispatch happens in processDispatchNotification
1867 return list;
1868 }
1869 consumers = new ArrayList<Subscription>(this.consumers);
1870 }finally {
1871 consumersLock.writeLock().unlock();
1872 }
1873
1874 PendingList rc;
1875 if(isPrioritizedMessages()) {
1876 rc = new PrioritizedPendingList();
1877 } else {
1878 rc = new OrderedPendingList();
1879 }
1880
1881 Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
1882
1883 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
1884
1885 MessageReference node = (MessageReference) iterator.next();
1886 Subscription target = null;
1887 int interestCount = 0;
1888 for (Subscription s : consumers) {
1889 if (s instanceof QueueBrowserSubscription) {
1890 interestCount++;
1891 continue;
1892 }
1893 if (!fullConsumers.contains(s)) {
1894 if (!s.isFull()) {
1895 if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
1896 // Dispatch it.
1897 s.add(node);
1898 target = s;
1899 break;
1900 }
1901 } else {
1902 // no further dispatch of list to a full consumer to
1903 // avoid out of order message receipt
1904 fullConsumers.add(s);
1905 LOG.trace("Subscription full {}", s);
1906 }
1907 }
1908 // make sure it gets dispatched again
1909 if (!node.isDropped()) {
1910 interestCount++;
1911 }
1912 }
1913
1914 if ((target == null && interestCount > 0) || consumers.size() == 0) {
1915 // This means all subs were full or that there are no
1916 // consumers...
1917 rc.addMessageLast((QueueMessageReference) node);
1918 }
1919
1920 // If it got dispatched, rotate the consumer list to get round robin
1921 // distribution.
1922 if (target != null && !strictOrderDispatch && consumers.size() > 1
1923 && !dispatchSelector.isExclusiveConsumer(target)) {
1924 consumersLock.writeLock().lock();
1925 try {
1926 if (removeFromConsumerList(target)) {
1927 addToConsumerList(target);
1928 consumers = new ArrayList<Subscription>(this.consumers);
1929 }
1930 }finally {
1931 consumersLock.writeLock().unlock();
1932 }
1933 }
1934 }
1935
1936 return rc;
1937 }
1938
1939 protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
1940 boolean result = true;
1941 // Keep message groups together.
1942 String groupId = node.getGroupID();
1943 int sequence = node.getGroupSequence();
1944 if (groupId != null) {
1945
1946 MessageGroupMap messageGroupOwners = getMessageGroupOwners();
1947 // If we can own the first, then no-one else should own the
1948 // rest.
1949 if (sequence == 1) {
1950 assignGroup(subscription, messageGroupOwners, node, groupId);
1951 } else {
1952
1953 // Make sure that the previous owner is still valid, we may
1954 // need to become the new owner.
1955 ConsumerId groupOwner;
1956
1957 groupOwner = messageGroupOwners.get(groupId);
1958 if (groupOwner == null) {
1959 assignGroup(subscription, messageGroupOwners, node, groupId);
1960 } else {
1961 if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
1962 // A group sequence < 1 is an end of group signal.
1963 if (sequence < 0) {
1964 messageGroupOwners.removeGroup(groupId);
1965 subscription.getConsumerInfo().setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1);
1966 }
1967 } else {
1968 result = false;
1969 }
1970 }
1971 }
1972 }
1973
1974 return result;
1975
1976 }
1977
1978 protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
1979 messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
1980 Message message = n.getMessage();
1981 if (message instanceof ActiveMQMessage) {
1982 ActiveMQMessage activeMessage = (ActiveMQMessage) message;
1983 try {
1984 activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
1985 } catch (JMSException e) {
1986 LOG.warn("Failed to set boolean header: " + e, e);
1987 }
1988 }
1989 subs.getConsumerInfo().setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1);
1990 }
1991
1992 protected void pageInMessages(boolean force) throws Exception {
1993 doDispatch(doPageInForDispatch(force));
1994 }
1995
1996 private void addToConsumerList(Subscription sub) {
1997 if (useConsumerPriority) {
1998 consumers.add(sub);
1999 Collections.sort(consumers, orderedCompare);
2000 } else {
2001 consumers.add(sub);
2002 }
2003 }
2004
2005 private boolean removeFromConsumerList(Subscription sub) {
2006 return consumers.remove(sub);
2007 }
2008
2009 private int getConsumerMessageCountBeforeFull() throws Exception {
2010 int total = 0;
2011 boolean zeroPrefetch = false;
2012 consumersLock.readLock().lock();
2013 try{
2014 for (Subscription s : consumers) {
2015 zeroPrefetch |= s.getPrefetchSize() == 0;
2016 int countBeforeFull = s.countBeforeFull();
2017 total += countBeforeFull;
2018 }
2019 }finally {
2020 consumersLock.readLock().unlock();
2021 }
2022 if (total == 0 && zeroPrefetch) {
2023 total = 1;
2024 }
2025 return total;
2026 }
2027
2028 /*
2029 * In slave mode, dispatch is ignored till we get this notification as the
2030 * dispatch process is non deterministic between master and slave. On a
2031 * notification, the actual dispatch to the subscription (as chosen by the
2032 * master) is completed. (non-Javadoc)
2033 * @see
2034 * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
2035 * (org.apache.activemq.command.MessageDispatchNotification)
2036 */
2037 @Override
2038 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
2039 // do dispatch
2040 Subscription sub = getMatchingSubscription(messageDispatchNotification);
2041 if (sub != null) {
2042 MessageReference message = getMatchingMessage(messageDispatchNotification);
2043 sub.add(message);
2044 sub.processMessageDispatchNotification(messageDispatchNotification);
2045 }
2046 }
2047
2048 private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
2049 throws Exception {
2050 QueueMessageReference message = null;
2051 MessageId messageId = messageDispatchNotification.getMessageId();
2052
2053 pagedInPendingDispatchLock.writeLock().lock();
2054 try {
2055 for (MessageReference ref : pagedInPendingDispatch) {
2056 if (messageId.equals(ref.getMessageId())) {
2057 message = (QueueMessageReference)ref;
2058 pagedInPendingDispatch.remove(ref);
2059 break;
2060 }
2061 }
2062 } finally {
2063 pagedInPendingDispatchLock.writeLock().unlock();
2064 }
2065
2066 if (message == null) {
2067 pagedInMessagesLock.readLock().lock();
2068 try {
2069 message = pagedInMessages.get(messageId);
2070 } finally {
2071 pagedInMessagesLock.readLock().unlock();
2072 }
2073 }
2074
2075 if (message == null) {
2076 messagesLock.writeLock().lock();
2077 try {
2078 try {
2079 messages.setMaxBatchSize(getMaxPageSize());
2080 messages.reset();
2081 while (messages.hasNext()) {
2082 MessageReference node = messages.next();
2083 messages.remove();
2084 if (messageId.equals(node.getMessageId())) {
2085 message = this.createMessageReference(node.getMessage());
2086 break;
2087 }
2088 }
2089 } finally {
2090 messages.release();
2091 }
2092 } finally {
2093 messagesLock.writeLock().unlock();
2094 }
2095 }
2096
2097 if (message == null) {
2098 Message msg = loadMessage(messageId);
2099 if (msg != null) {
2100 message = this.createMessageReference(msg);
2101 }
2102 }
2103
2104 if (message == null) {
2105 throw new JMSException("Slave broker out of sync with master - Message: "
2106 + messageDispatchNotification.getMessageId() + " on "
2107 + messageDispatchNotification.getDestination() + " does not exist among pending("
2108 + pagedInPendingDispatch.size() + ") for subscription: "
2109 + messageDispatchNotification.getConsumerId());
2110 }
2111 return message;
2112 }
2113
2114 /**
2115 * Find a consumer that matches the id in the message dispatch notification
2116 *
2117 * @param messageDispatchNotification
2118 * @return sub or null if the subscription has been removed before dispatch
2119 * @throws JMSException
2120 */
2121 private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
2122 throws JMSException {
2123 Subscription sub = null;
2124 consumersLock.readLock().lock();
2125 try {
2126 for (Subscription s : consumers) {
2127 if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
2128 sub = s;
2129 break;
2130 }
2131 }
2132 }finally {
2133 consumersLock.readLock().unlock();
2134 }
2135 return sub;
2136 }
2137
2138 public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) {
2139 if (oldPercentUsage > newPercentUsage) {
2140 asyncWakeup();
2141 }
2142 }
2143
2144 @Override
2145 protected Logger getLog() {
2146 return LOG;
2147 }
2148
2149 protected boolean isOptimizeStorage(){
2150 boolean result = false;
2151 if (isDoOptimzeMessageStorage()){
2152 consumersLock.readLock().lock();
2153 try{
2154 if (consumers.isEmpty()==false){
2155 result = true;
2156 for (Subscription s : consumers) {
2157 if (s.getPrefetchSize()==0){
2158 result = false;
2159 break;
2160 }
2161 if (s.isSlowConsumer()){
2162 result = false;
2163 break;
2164 }
2165 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
2166 result = false;
2167 break;
2168 }
2169 }
2170 }
2171 }finally {
2172 consumersLock.readLock().unlock();
2173 }
2174 }
2175 return result;
2176 }
2177 }