001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.*;
021    import java.util.concurrent.CancellationException;
022    import java.util.concurrent.ConcurrentHashMap;
023    import java.util.concurrent.ConcurrentLinkedQueue;
024    import java.util.concurrent.CountDownLatch;
025    import java.util.concurrent.DelayQueue;
026    import java.util.concurrent.Delayed;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.Future;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.atomic.AtomicLong;
031    import java.util.concurrent.locks.Lock;
032    import java.util.concurrent.locks.ReentrantLock;
033    import java.util.concurrent.locks.ReentrantReadWriteLock;
034    
035    import javax.jms.InvalidSelectorException;
036    import javax.jms.JMSException;
037    import javax.jms.ResourceAllocationException;
038    import org.apache.activemq.broker.BrokerService;
039    import org.apache.activemq.broker.ConnectionContext;
040    import org.apache.activemq.broker.ProducerBrokerExchange;
041    import org.apache.activemq.broker.region.cursors.OrderedPendingList;
042    import org.apache.activemq.broker.region.cursors.PendingList;
043    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
044    import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
045    import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
046    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
047    import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory;
048    import org.apache.activemq.broker.region.group.MessageGroupMap;
049    import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
050    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
051    import org.apache.activemq.broker.region.policy.DispatchPolicy;
052    import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
053    import org.apache.activemq.broker.util.InsertionCountList;
054    import org.apache.activemq.command.*;
055    import org.apache.activemq.filter.BooleanExpression;
056    import org.apache.activemq.filter.MessageEvaluationContext;
057    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
058    import org.apache.activemq.selector.SelectorParser;
059    import org.apache.activemq.state.ProducerState;
060    import org.apache.activemq.store.MessageRecoveryListener;
061    import org.apache.activemq.store.MessageStore;
062    import org.apache.activemq.thread.Task;
063    import org.apache.activemq.thread.TaskRunner;
064    import org.apache.activemq.thread.TaskRunnerFactory;
065    import org.apache.activemq.transaction.Synchronization;
066    import org.apache.activemq.transaction.Transaction;
067    import org.apache.activemq.usage.Usage;
068    import org.apache.activemq.usage.UsageListener;
069    import org.apache.activemq.util.BrokerSupport;
070    import org.apache.activemq.util.ThreadPoolUtils;
071    import org.slf4j.Logger;
072    import org.slf4j.LoggerFactory;
073    import org.slf4j.MDC;
074    
075    /**
076     * The Queue is a List of MessageEntry objects that are dispatched to matching
077     * subscriptions.
078     */
079    public class Queue extends BaseDestination implements Task, UsageListener {
080        protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
081        protected final TaskRunnerFactory taskFactory;
082        protected TaskRunner taskRunner;
083        private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
084        protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
085        private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
086        protected PendingMessageCursor messages;
087        private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
088        private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
089        // Messages that are paged in but have not yet been targeted at a subscription
090        private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
091        protected PendingList pagedInPendingDispatch = new OrderedPendingList();
092        protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
093        private MessageGroupMap messageGroupOwners;
094        private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
095        private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory();
096        final Lock sendLock = new ReentrantLock();
097        private ExecutorService executor;
098        private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>();
099        private boolean useConsumerPriority = true;
100        private boolean strictOrderDispatch = false;
101        private final QueueDispatchSelector dispatchSelector;
102        private boolean optimizedDispatch = false;
103        private boolean iterationRunning = false;
104        private boolean firstConsumer = false;
105        private int timeBeforeDispatchStarts = 0;
106        private int consumersBeforeDispatchStarts = 0;
107        private CountDownLatch consumersBeforeStartsLatch;
108        private final AtomicLong pendingWakeups = new AtomicLong();
109        private boolean allConsumersExclusiveByDefault = false;
110    
111        private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
112            @Override
113            public void run() {
114                asyncWakeup();
115            }
116        };
117        private final Runnable expireMessagesTask = new Runnable() {
118            @Override
119            public void run() {
120                expireMessages();
121            }
122        };
123    
124        private final Object iteratingMutex = new Object();
125    
126        class TimeoutMessage implements Delayed {
127    
128            Message message;
129            ConnectionContext context;
130            long trigger;
131    
132            public TimeoutMessage(Message message, ConnectionContext context, long delay) {
133                this.message = message;
134                this.context = context;
135                this.trigger = System.currentTimeMillis() + delay;
136            }
137    
138            @Override
139            public long getDelay(TimeUnit unit) {
140                long n = trigger - System.currentTimeMillis();
141                return unit.convert(n, TimeUnit.MILLISECONDS);
142            }
143    
144            @Override
145            public int compareTo(Delayed delayed) {
146                long other = ((TimeoutMessage) delayed).trigger;
147                int returnValue;
148                if (this.trigger < other) {
149                    returnValue = -1;
150                } else if (this.trigger > other) {
151                    returnValue = 1;
152                } else {
153                    returnValue = 0;
154                }
155                return returnValue;
156            }
157        }
158    
159        DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
160    
161        class FlowControlTimeoutTask extends Thread {
162    
163            @Override
164            public void run() {
165                TimeoutMessage timeout;
166                try {
167                    while (true) {
168                        timeout = flowControlTimeoutMessages.take();
169                        if (timeout != null) {
170                            synchronized (messagesWaitingForSpace) {
171                                if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
172                                    ExceptionResponse response = new ExceptionResponse(
173                                            new ResourceAllocationException(
174                                                    "Usage Manager Memory Limit reached. Stopping producer ("
175                                                            + timeout.message.getProducerId()
176                                                            + ") to prevent flooding "
177                                                            + getActiveMQDestination().getQualifiedName()
178                                                            + "."
179                                                            + " See http://activemq.apache.org/producer-flow-control.html for more info"));
180                                    response.setCorrelationId(timeout.message.getCommandId());
181                                    timeout.context.getConnection().dispatchAsync(response);
182                                }
183                            }
184                        }
185                    }
186                } catch (InterruptedException e) {
187                    LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping");
188                }
189            }
190        };
191    
192        private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
193    
194        private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
195    
196            @Override
197            public int compare(Subscription s1, Subscription s2) {
198                // We want the list sorted in descending order
199                int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
200                if (val == 0 && messageGroupOwners != null) {
201                    // then ascending order of assigned message groups to favour less loaded consumers
202                    // Long.compare in jdk7
203                    long x = s1.getConsumerInfo().getLastDeliveredSequenceId();
204                    long y = s2.getConsumerInfo().getLastDeliveredSequenceId();
205                    val = (x < y) ? -1 : ((x == y) ? 0 : 1);
206                }
207                return val;
208            }
209        };
210    
211        public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
212                DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
213            super(brokerService, store, destination, parentStats);
214            this.taskFactory = taskFactory;
215            this.dispatchSelector = new QueueDispatchSelector(destination);
216        }
217    
218        @Override
219        public List<Subscription> getConsumers() {
220            consumersLock.readLock().lock();
221            try {
222                return new ArrayList<Subscription>(consumers);
223            } finally {
224                consumersLock.readLock().unlock();
225            }
226        }
227    
228        // make the queue easily visible in the debugger from its task runner
229        // threads
230        final class QueueThread extends Thread {
231            final Queue queue;
232    
233            public QueueThread(Runnable runnable, String name, Queue queue) {
234                super(runnable, name);
235                this.queue = queue;
236            }
237        }
238    
239        class BatchMessageRecoveryListener implements MessageRecoveryListener {
240            final LinkedList<Message> toExpire = new LinkedList<Message>();
241            final double totalMessageCount;
242            int recoveredAccumulator = 0;
243            int currentBatchCount;
244    
245            BatchMessageRecoveryListener(int totalMessageCount) {
246                this.totalMessageCount = totalMessageCount;
247                currentBatchCount = recoveredAccumulator;
248            }
249    
250            @Override
251            public boolean recoverMessage(Message message) {
252                recoveredAccumulator++;
253                if ((recoveredAccumulator % 10000) == 0) {
254                    LOG.info("cursor for {} has recovered {} messages. {}% complete", new Object[]{ getActiveMQDestination().getQualifiedName(), recoveredAccumulator, new Integer((int) (recoveredAccumulator * 100 / totalMessageCount))});
255                }
256                // Message could have expired while it was being
257                // loaded..
258                if (message.isExpired() && broker.isExpired(message)) {
259                    toExpire.add(message);
260                    return true;
261                }
262                if (hasSpace()) {
263                    message.setRegionDestination(Queue.this);
264                    messagesLock.writeLock().lock();
265                    try {
266                        try {
267                            messages.addMessageLast(message);
268                        } catch (Exception e) {
269                            LOG.error("Failed to add message to cursor", e);
270                        }
271                    } finally {
272                        messagesLock.writeLock().unlock();
273                    }
274                    destinationStatistics.getMessages().increment();
275                    return true;
276                }
277                return false;
278            }
279    
280            @Override
281            public boolean recoverMessageReference(MessageId messageReference) throws Exception {
282                throw new RuntimeException("Should not be called.");
283            }
284    
285            @Override
286            public boolean hasSpace() {
287                return true;
288            }
289    
290            @Override
291            public boolean isDuplicate(MessageId id) {
292                return false;
293            }
294    
295            public void reset() {
296                currentBatchCount = recoveredAccumulator;
297            }
298    
299            public void processExpired() {
300                for (Message message: toExpire) {
301                    messageExpired(createConnectionContext(), createMessageReference(message));
302                    // drop message will decrement so counter
303                    // balance here
304                    destinationStatistics.getMessages().increment();
305                }
306                toExpire.clear();
307            }
308    
309            public boolean done() {
310                return currentBatchCount == recoveredAccumulator;
311            }
312        }
313    
314        @Override
315        public void setPrioritizedMessages(boolean prioritizedMessages) {
316            super.setPrioritizedMessages(prioritizedMessages);
317    
318            if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
319                pagedInPendingDispatch = new PrioritizedPendingList();
320                redeliveredWaitingDispatch = new PrioritizedPendingList();
321            } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
322                pagedInPendingDispatch = new OrderedPendingList();
323                redeliveredWaitingDispatch = new OrderedPendingList();
324            }
325        }
326    
327        @Override
328        public void initialize() throws Exception {
329    
330            if (this.messages == null) {
331                if (destination.isTemporary() || broker == null || store == null) {
332                    this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
333                } else {
334                    this.messages = new StoreQueueCursor(broker, this);
335                }
336            }
337    
338            // If a VMPendingMessageCursor don't use the default Producer System
339            // Usage
340            // since it turns into a shared blocking queue which can lead to a
341            // network deadlock.
342            // If we are cursoring to disk..it's not and issue because it does not
343            // block due
344            // to large disk sizes.
345            if (messages instanceof VMPendingMessageCursor) {
346                this.systemUsage = brokerService.getSystemUsage();
347                memoryUsage.setParent(systemUsage.getMemoryUsage());
348            }
349    
350            this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
351    
352            super.initialize();
353            if (store != null) {
354                // Restore the persistent messages.
355                messages.setSystemUsage(systemUsage);
356                messages.setEnableAudit(isEnableAudit());
357                messages.setMaxAuditDepth(getMaxAuditDepth());
358                messages.setMaxProducersToAudit(getMaxProducersToAudit());
359                messages.setUseCache(isUseCache());
360                messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
361                final int messageCount = store.getMessageCount();
362                if (messageCount > 0 && messages.isRecoveryRequired()) {
363                    BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
364                    do {
365                       listener.reset();
366                       store.recoverNextMessages(getMaxPageSize(), listener);
367                       listener.processExpired();
368                   } while (!listener.done());
369                } else {
370                    destinationStatistics.getMessages().setCount(messageCount);
371                }
372            }
373        }
374    
375        /*
376         * Holder for subscription that needs attention on next iterate browser
377         * needs access to existing messages in the queue that have already been
378         * dispatched
379         */
380        class BrowserDispatch {
381            QueueBrowserSubscription browser;
382    
383            public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
384                browser = browserSubscription;
385                browser.incrementQueueRef();
386            }
387    
388            void done() {
389                try {
390                    browser.decrementQueueRef();
391                } catch (Exception e) {
392                    LOG.warn("decrement ref on browser: " + browser, e);
393                }
394            }
395    
396            public QueueBrowserSubscription getBrowser() {
397                return browser;
398            }
399        }
400    
401        ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
402    
403        @Override
404        public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
405            LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() });
406    
407            super.addSubscription(context, sub);
408            // synchronize with dispatch method so that no new messages are sent
409            // while setting up a subscription. avoid out of order messages,
410            // duplicates, etc.
411            pagedInPendingDispatchLock.writeLock().lock();
412            try {
413    
414                sub.add(context, this);
415    
416                // needs to be synchronized - so no contention with dispatching
417                // consumersLock.
418                consumersLock.writeLock().lock();
419                try {
420                    // set a flag if this is a first consumer
421                    if (consumers.size() == 0) {
422                        firstConsumer = true;
423                        if (consumersBeforeDispatchStarts != 0) {
424                            consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
425                        }
426                    } else {
427                        if (consumersBeforeStartsLatch != null) {
428                            consumersBeforeStartsLatch.countDown();
429                        }
430                    }
431    
432                    addToConsumerList(sub);
433                    if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
434                        Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
435                        if (exclusiveConsumer == null) {
436                            exclusiveConsumer = sub;
437                        } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE ||
438                            sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
439                            exclusiveConsumer = sub;
440                        }
441                        dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
442                    }
443                } finally {
444                    consumersLock.writeLock().unlock();
445                }
446    
447                if (sub instanceof QueueBrowserSubscription) {
448                    // tee up for dispatch in next iterate
449                    QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
450                    BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
451                    browserDispatches.add(browserDispatch);
452                }
453    
454                if (!this.optimizedDispatch) {
455                    wakeup();
456                }
457            } finally {
458                pagedInPendingDispatchLock.writeLock().unlock();
459            }
460            if (this.optimizedDispatch) {
461                // Outside of dispatchLock() to maintain the lock hierarchy of
462                // iteratingMutex -> dispatchLock. - see
463                // https://issues.apache.org/activemq/browse/AMQ-1878
464                wakeup();
465            }
466        }
467    
468        @Override
469        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
470                throws Exception {
471            super.removeSubscription(context, sub, lastDeiveredSequenceId);
472            // synchronize with dispatch method so that no new messages are sent
473            // while removing up a subscription.
474            pagedInPendingDispatchLock.writeLock().lock();
475            try {
476                LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{
477                        getActiveMQDestination().getQualifiedName(),
478                        sub,
479                        lastDeiveredSequenceId,
480                        getDestinationStatistics().getDequeues().getCount(),
481                        getDestinationStatistics().getDispatched().getCount(),
482                        getDestinationStatistics().getInflight().getCount()
483                });
484                consumersLock.writeLock().lock();
485                try {
486                    removeFromConsumerList(sub);
487                    if (sub.getConsumerInfo().isExclusive()) {
488                        Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
489                        if (exclusiveConsumer == sub) {
490                            exclusiveConsumer = null;
491                            for (Subscription s : consumers) {
492                                if (s.getConsumerInfo().isExclusive()
493                                        && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
494                                                .getConsumerInfo().getPriority())) {
495                                    exclusiveConsumer = s;
496    
497                                }
498                            }
499                            dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
500                        }
501                    } else if (isAllConsumersExclusiveByDefault()) {
502                        Subscription exclusiveConsumer = null;
503                        for (Subscription s : consumers) {
504                            if (exclusiveConsumer == null
505                                    || s.getConsumerInfo().getPriority() > exclusiveConsumer
506                                    .getConsumerInfo().getPriority()) {
507                                exclusiveConsumer = s;
508                                    }
509                        }
510                        dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
511                    }
512                    ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
513                    getMessageGroupOwners().removeConsumer(consumerId);
514    
515                    // redeliver inflight messages
516    
517                    boolean markAsRedelivered = false;
518                    MessageReference lastDeliveredRef = null;
519                    List<MessageReference> unAckedMessages = sub.remove(context, this);
520    
521                    // locate last redelivered in unconsumed list (list in delivery rather than seq order)
522                    if (lastDeiveredSequenceId != 0) {
523                        for (MessageReference ref : unAckedMessages) {
524                            if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
525                                lastDeliveredRef = ref;
526                                markAsRedelivered = true;
527                                LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeiveredSequenceId, ref.getMessageId());
528                                break;
529                            }
530                        }
531                    }
532    
533                    for (MessageReference ref : unAckedMessages) {
534                        QueueMessageReference qmr = (QueueMessageReference) ref;
535                        if (qmr.getLockOwner() == sub) {
536                            qmr.unlock();
537    
538                            // have no delivery information
539                            if (lastDeiveredSequenceId == 0) {
540                                qmr.incrementRedeliveryCounter();
541                            } else {
542                                if (markAsRedelivered) {
543                                    qmr.incrementRedeliveryCounter();
544                                }
545                                if (ref == lastDeliveredRef) {
546                                    // all that follow were not redelivered
547                                    markAsRedelivered = false;
548                                }
549                            }
550                        }
551                        if (!qmr.isDropped()) {
552                            redeliveredWaitingDispatch.addMessageLast(qmr);
553                        }
554                    }
555                    if (sub instanceof QueueBrowserSubscription) {
556                        ((QueueBrowserSubscription)sub).decrementQueueRef();
557                        browserDispatches.remove(sub);
558                    }
559                    if (!redeliveredWaitingDispatch.isEmpty()) {
560                        doDispatch(new OrderedPendingList());
561                    }
562                } finally {
563                    consumersLock.writeLock().unlock();
564                }
565                if (!this.optimizedDispatch) {
566                    wakeup();
567                }
568            } finally {
569                pagedInPendingDispatchLock.writeLock().unlock();
570            }
571            if (this.optimizedDispatch) {
572                // Outside of dispatchLock() to maintain the lock hierarchy of
573                // iteratingMutex -> dispatchLock. - see
574                // https://issues.apache.org/activemq/browse/AMQ-1878
575                wakeup();
576            }
577        }
578    
579        @Override
580        public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
581            final ConnectionContext context = producerExchange.getConnectionContext();
582            // There is delay between the client sending it and it arriving at the
583            // destination.. it may have expired.
584            message.setRegionDestination(this);
585            ProducerState state = producerExchange.getProducerState();
586            if (state == null) {
587                LOG.warn("Send failed for: {}, missing producer state for: {}", message, producerExchange);
588                throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
589            }
590            final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
591            final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
592                    && !context.isInRecoveryMode();
593            if (message.isExpired()) {
594                // message not stored - or added to stats yet - so chuck here
595                broker.getRoot().messageExpired(context, message, null);
596                if (sendProducerAck) {
597                    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
598                    context.getConnection().dispatchAsync(ack);
599                }
600                return;
601            }
602            if (memoryUsage.isFull()) {
603                isFull(context, memoryUsage);
604                fastProducer(context, producerInfo);
605                if (isProducerFlowControl() && context.isProducerFlowControl()) {
606                    if (warnOnProducerFlowControl) {
607                        warnOnProducerFlowControl = false;
608                        LOG.info("Usage Manager Memory Limit ({}) reached on {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
609                                        memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName());
610                    }
611    
612                    if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
613                        throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
614                                + message.getProducerId() + ") to prevent flooding "
615                                + getActiveMQDestination().getQualifiedName() + "."
616                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
617                    }
618    
619                    // We can avoid blocking due to low usage if the producer is
620                    // sending
621                    // a sync message or if it is using a producer window
622                    if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
623                        // copy the exchange state since the context will be
624                        // modified while we are waiting
625                        // for space.
626                        final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
627                        synchronized (messagesWaitingForSpace) {
628                         // Start flow control timeout task
629                            // Prevent trying to start it multiple times
630                            if (!flowControlTimeoutTask.isAlive()) {
631                                flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task");
632                                flowControlTimeoutTask.start();
633                            }
634                            messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
635                                @Override
636                                public void run() {
637    
638                                    try {
639                                        // While waiting for space to free up... the
640                                        // message may have expired.
641                                        if (message.isExpired()) {
642                                            LOG.error("expired waiting for space..");
643                                            broker.messageExpired(context, message, null);
644                                            destinationStatistics.getExpired().increment();
645                                        } else {
646                                            doMessageSend(producerExchangeCopy, message);
647                                        }
648    
649                                        if (sendProducerAck) {
650                                            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
651                                                    .getSize());
652                                            context.getConnection().dispatchAsync(ack);
653                                        } else {
654                                            Response response = new Response();
655                                            response.setCorrelationId(message.getCommandId());
656                                            context.getConnection().dispatchAsync(response);
657                                        }
658    
659                                    } catch (Exception e) {
660                                        if (!sendProducerAck && !context.isInRecoveryMode() && !brokerService.isStopping()) {
661                                            ExceptionResponse response = new ExceptionResponse(e);
662                                            response.setCorrelationId(message.getCommandId());
663                                            context.getConnection().dispatchAsync(response);
664                                        } else {
665                                            LOG.debug("unexpected exception on deferred send of: {}", message, e);
666                                        }
667                                    }
668                                }
669                            });
670    
671                            if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
672                                flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
673                                        .getSendFailIfNoSpaceAfterTimeout()));
674                            }
675    
676                            registerCallbackForNotFullNotification();
677                            context.setDontSendReponse(true);
678                            return;
679                        }
680    
681                    } else {
682    
683                        if (memoryUsage.isFull()) {
684                            waitForSpace(context, producerExchange, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
685                                    + message.getProducerId() + ") stopped to prevent flooding "
686                                    + getActiveMQDestination().getQualifiedName() + "."
687                                    + " See http://activemq.apache.org/producer-flow-control.html for more info");
688                        }
689    
690                        // The usage manager could have delayed us by the time
691                        // we unblock the message could have expired..
692                        if (message.isExpired()) {
693                            LOG.debug("Expired message: {}", message);
694                            broker.getRoot().messageExpired(context, message, null);
695                            return;
696                        }
697                    }
698                }
699            }
700            doMessageSend(producerExchange, message);
701            if (sendProducerAck) {
702                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
703                context.getConnection().dispatchAsync(ack);
704            }
705        }
706    
707        private void registerCallbackForNotFullNotification() {
708            // If the usage manager is not full, then the task will not
709            // get called..
710            if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
711                // so call it directly here.
712                sendMessagesWaitingForSpaceTask.run();
713            }
714        }
715    
716        final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>();
717        private volatile LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
718    
719        // roll up all message sends
720        class SendSync extends Synchronization {
721    
722            class MessageContext {
723                public Message message;
724                public ConnectionContext context;
725    
726                public MessageContext(ConnectionContext context, Message message) {
727                    this.context = context;
728                    this.message = message;
729                }
730            }
731    
732            final Transaction transaction;
733            List<MessageContext> additions = new ArrayList<MessageContext>();
734    
735            public SendSync(Transaction transaction) {
736                this.transaction = transaction;
737            }
738    
739            public void add(ConnectionContext context, Message message) {
740                additions.add(new MessageContext(context, message));
741            }
742    
743            @Override
744            public void beforeCommit() throws Exception {
745                synchronized (sendLock) {
746                    orderIndexUpdates.addLast(transaction);
747                }
748            }
749    
750            @Override
751            public void afterCommit() throws Exception {
752                LinkedList<Transaction> orderedWork = new LinkedList<Transaction>();;
753                // use existing object to sync orderIndexUpdates that can be reassigned
754                synchronized (sendLock) {
755                    Transaction next = orderIndexUpdates.peek();
756                    while( next!=null && next.isCommitted() ) {
757                        orderedWork.addLast(orderIndexUpdates.removeFirst());
758                        next = orderIndexUpdates.peek();
759                    }
760                }
761                // do the ordered work
762                if (!orderedWork.isEmpty()) {
763    
764                    ArrayList<SendSync> syncs = new ArrayList<SendSync>(orderedWork.size());;
765                    for (Transaction tx : orderedWork) {
766                        syncs.add(sendSyncs.remove(tx));
767                    }
768                    sendLock.lockInterruptibly();
769                    try {
770                        for (SendSync sync : syncs) {
771                            sync.processSend();
772                        }
773                    } finally {
774                        sendLock.unlock();
775                    }
776                    for (SendSync sync : syncs) {
777                        sync.processSent();
778                    }
779                }
780            }
781    
782            // called with sendLock
783            private void processSend() throws Exception {
784    
785                for (Iterator<MessageContext> iterator = additions.iterator(); iterator.hasNext(); ) {
786                    MessageContext messageContext = iterator.next();
787                    // It could take while before we receive the commit
788                    // op, by that time the message could have expired..
789                    if (broker.isExpired(messageContext.message)) {
790                        broker.messageExpired(messageContext.context, messageContext.message, null);
791                        destinationStatistics.getExpired().increment();
792                        iterator.remove();
793                        continue;
794                    }
795                    sendMessage(messageContext.message);
796                    messageContext.message.decrementReferenceCount();
797                }
798            }
799    
800            private void processSent() throws Exception {
801                for (MessageContext messageContext : additions) {
802                    messageSent(messageContext.context, messageContext.message);
803                }
804            }
805    
806            @Override
807            public void afterRollback() throws Exception {
808                try {
809                    for (MessageContext messageContext : additions) {
810                        messageContext.message.decrementReferenceCount();
811                    }
812                } finally {
813                    sendSyncs.remove(transaction);
814                }
815            }
816        }
817    
818        // called while holding the sendLock
819        private void registerSendSync(Message message, ConnectionContext context) {
820            final Transaction transaction = context.getTransaction();
821            Queue.SendSync currentSync = sendSyncs.get(transaction);
822            if (currentSync == null) {
823                currentSync = new Queue.SendSync(transaction);
824                transaction.addSynchronization(currentSync);
825                sendSyncs.put(transaction, currentSync);
826            }
827            currentSync.add(context, message);
828        }
829    
830        void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
831                Exception {
832            final ConnectionContext context = producerExchange.getConnectionContext();
833            Future<Object> result = null;
834    
835            producerExchange.incrementSend();
836            checkUsage(context, producerExchange, message);
837            sendLock.lockInterruptibly();
838            try {
839                if (store != null && message.isPersistent()) {
840                    message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
841                    if (messages.isCacheEnabled()) {
842                        result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
843                    } else {
844                        store.addMessage(context, message);
845                    }
846                    if (isReduceMemoryFootprint()) {
847                        message.clearMarshalledState();
848                    }
849                }
850                if (context.isInTransaction()) {
851                    // If this is a transacted message.. increase the usage now so that
852                    // a big TX does not blow up
853                    // our memory. This increment is decremented once the tx finishes..
854                    message.incrementReferenceCount();
855    
856                    registerSendSync(message, context);
857                } else {
858                    // Add to the pending list, this takes care of incrementing the
859                    // usage manager.
860                    sendMessage(message);
861                }
862            } finally {
863                sendLock.unlock();
864            }
865            if (!context.isInTransaction()) {
866                messageSent(context, message);
867            }
868            if (result != null && !result.isCancelled()) {
869                try {
870                    result.get();
871                } catch (CancellationException e) {
872                    // ignore - the task has been cancelled if the message
873                    // has already been deleted
874                }
875            }
876        }
877    
878        private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException {
879            if (message.isPersistent()) {
880                if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
881                    final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
882                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
883                        + message.getProducerId() + ") to prevent flooding "
884                        + getActiveMQDestination().getQualifiedName() + "."
885                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
886    
887                    waitForSpace(context, producerBrokerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
888                }
889            } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
890                final String logMessage = "Temp Store is Full ("
891                        + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit()
892                        +"). Stopping producer (" + message.getProducerId()
893                    + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
894                    + " See http://activemq.apache.org/producer-flow-control.html for more info";
895    
896                waitForSpace(context, producerBrokerExchange, messages.getSystemUsage().getTempUsage(), logMessage);
897            }
898        }
899    
900        private void expireMessages() {
901            LOG.debug("{} expiring messages ..", getActiveMQDestination().getQualifiedName());
902    
903            // just track the insertion count
904            List<Message> browsedMessages = new InsertionCountList<Message>();
905            doBrowse(browsedMessages, this.getMaxExpirePageSize());
906            asyncWakeup();
907            LOG.debug("{} expiring messages done.", getActiveMQDestination().getQualifiedName());
908        }
909    
910        @Override
911        public void gc() {
912        }
913    
914        @Override
915        public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
916                throws IOException {
917            messageConsumed(context, node);
918            if (store != null && node.isPersistent()) {
919                store.removeAsyncMessage(context, convertToNonRangedAck(ack, node));
920            }
921        }
922    
923        Message loadMessage(MessageId messageId) throws IOException {
924            Message msg = null;
925            if (store != null) { // can be null for a temp q
926                msg = store.getMessage(messageId);
927                if (msg != null) {
928                    msg.setRegionDestination(this);
929                }
930            }
931            return msg;
932        }
933    
934        @Override
935        public String toString() {
936            int size = 0;
937            messagesLock.readLock().lock();
938            try {
939                size = messages.size();
940            } finally {
941                messagesLock.readLock().unlock();
942            }
943            return destination.getQualifiedName() + ", subscriptions=" + consumers.size()
944                    + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
945                    + messageGroupOwners;
946        }
947    
948        @Override
949        public void start() throws Exception {
950            if (memoryUsage != null) {
951                memoryUsage.start();
952            }
953            if (systemUsage.getStoreUsage() != null) {
954                systemUsage.getStoreUsage().start();
955            }
956            systemUsage.getMemoryUsage().addUsageListener(this);
957            messages.start();
958            if (getExpireMessagesPeriod() > 0) {
959                scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
960            }
961            doPageIn(false);
962        }
963    
964        @Override
965        public void stop() throws Exception {
966            if (taskRunner != null) {
967                taskRunner.shutdown();
968            }
969            if (this.executor != null) {
970                ThreadPoolUtils.shutdownNow(executor);
971                executor = null;
972            }
973    
974            scheduler.cancel(expireMessagesTask);
975    
976            if (flowControlTimeoutTask.isAlive()) {
977                flowControlTimeoutTask.interrupt();
978            }
979    
980            if (messages != null) {
981                messages.stop();
982            }
983    
984            systemUsage.getMemoryUsage().removeUsageListener(this);
985            if (memoryUsage != null) {
986                memoryUsage.stop();
987            }
988            if (store != null) {
989                store.stop();
990            }
991        }
992    
993        // Properties
994        // -------------------------------------------------------------------------
995        @Override
996        public ActiveMQDestination getActiveMQDestination() {
997            return destination;
998        }
999    
1000        public MessageGroupMap getMessageGroupOwners() {
1001            if (messageGroupOwners == null) {
1002                messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
1003            }
1004            return messageGroupOwners;
1005        }
1006    
1007        public DispatchPolicy getDispatchPolicy() {
1008            return dispatchPolicy;
1009        }
1010    
1011        public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
1012            this.dispatchPolicy = dispatchPolicy;
1013        }
1014    
1015        public MessageGroupMapFactory getMessageGroupMapFactory() {
1016            return messageGroupMapFactory;
1017        }
1018    
1019        public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
1020            this.messageGroupMapFactory = messageGroupMapFactory;
1021        }
1022    
1023        public PendingMessageCursor getMessages() {
1024            return this.messages;
1025        }
1026    
1027        public void setMessages(PendingMessageCursor messages) {
1028            this.messages = messages;
1029        }
1030    
1031        public boolean isUseConsumerPriority() {
1032            return useConsumerPriority;
1033        }
1034    
1035        public void setUseConsumerPriority(boolean useConsumerPriority) {
1036            this.useConsumerPriority = useConsumerPriority;
1037        }
1038    
1039        public boolean isStrictOrderDispatch() {
1040            return strictOrderDispatch;
1041        }
1042    
1043        public void setStrictOrderDispatch(boolean strictOrderDispatch) {
1044            this.strictOrderDispatch = strictOrderDispatch;
1045        }
1046    
1047        public boolean isOptimizedDispatch() {
1048            return optimizedDispatch;
1049        }
1050    
1051        public void setOptimizedDispatch(boolean optimizedDispatch) {
1052            this.optimizedDispatch = optimizedDispatch;
1053        }
1054    
1055        public int getTimeBeforeDispatchStarts() {
1056            return timeBeforeDispatchStarts;
1057        }
1058    
1059        public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
1060            this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
1061        }
1062    
1063        public int getConsumersBeforeDispatchStarts() {
1064            return consumersBeforeDispatchStarts;
1065        }
1066    
1067        public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
1068            this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
1069        }
1070    
1071        public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
1072            this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
1073        }
1074    
1075        public boolean isAllConsumersExclusiveByDefault() {
1076            return allConsumersExclusiveByDefault;
1077        }
1078    
1079        // Implementation methods
1080        // -------------------------------------------------------------------------
1081        private QueueMessageReference createMessageReference(Message message) {
1082            QueueMessageReference result = new IndirectMessageReference(message);
1083            return result;
1084        }
1085    
1086        @Override
1087        public Message[] browse() {
1088            List<Message> browseList = new ArrayList<Message>();
1089            doBrowse(browseList, getMaxBrowsePageSize());
1090            return browseList.toArray(new Message[browseList.size()]);
1091        }
1092    
1093        public void doBrowse(List<Message> browseList, int max) {
1094            final ConnectionContext connectionContext = createConnectionContext();
1095            try {
1096                pageInMessages(true);
1097                List<MessageReference> toExpire = new ArrayList<MessageReference>();
1098    
1099                pagedInPendingDispatchLock.writeLock().lock();
1100                try {
1101                    addAll(pagedInPendingDispatch.values(), browseList, max, toExpire);
1102                    for (MessageReference ref : toExpire) {
1103                        pagedInPendingDispatch.remove(ref);
1104                        if (broker.isExpired(ref)) {
1105                            LOG.debug("expiring from pagedInPending: {}", ref);
1106                            messageExpired(connectionContext, ref);
1107                        }
1108                    }
1109                } finally {
1110                    pagedInPendingDispatchLock.writeLock().unlock();
1111                }
1112                toExpire.clear();
1113                pagedInMessagesLock.readLock().lock();
1114                try {
1115                    addAll(pagedInMessages.values(), browseList, max, toExpire);
1116                } finally {
1117                    pagedInMessagesLock.readLock().unlock();
1118                }
1119                for (MessageReference ref : toExpire) {
1120                    if (broker.isExpired(ref)) {
1121                        LOG.debug("expiring from pagedInMessages: {}", ref);
1122                        messageExpired(connectionContext, ref);
1123                    } else {
1124                        pagedInMessagesLock.writeLock().lock();
1125                        try {
1126                            pagedInMessages.remove(ref.getMessageId());
1127                        } finally {
1128                            pagedInMessagesLock.writeLock().unlock();
1129                        }
1130                    }
1131                }
1132    
1133                if (browseList.size() < getMaxBrowsePageSize()) {
1134                    messagesLock.writeLock().lock();
1135                    try {
1136                        try {
1137                            messages.reset();
1138                            while (messages.hasNext() && browseList.size() < max) {
1139                                MessageReference node = messages.next();
1140                                if (node.isExpired()) {
1141                                    if (broker.isExpired(node)) {
1142                                        LOG.debug("expiring from messages: {}", node);
1143                                        messageExpired(connectionContext, createMessageReference(node.getMessage()));
1144                                    }
1145                                    messages.remove();
1146                                } else {
1147                                    messages.rollback(node.getMessageId());
1148                                    if (browseList.contains(node.getMessage()) == false) {
1149                                        browseList.add(node.getMessage());
1150                                    }
1151                                }
1152                                node.decrementReferenceCount();
1153                            }
1154                        } finally {
1155                            messages.release();
1156                        }
1157                    } finally {
1158                        messagesLock.writeLock().unlock();
1159                    }
1160                }
1161            } catch (Exception e) {
1162                LOG.error("Problem retrieving message for browse", e);
1163            }
1164        }
1165    
1166        private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int maxBrowsePageSize,
1167                List<MessageReference> toExpire) throws Exception {
1168            for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
1169                QueueMessageReference ref = (QueueMessageReference) i.next();
1170                if (ref.isExpired()) {
1171                    toExpire.add(ref);
1172                } else if (l.contains(ref.getMessage()) == false) {
1173                    l.add(ref.getMessage());
1174                }
1175            }
1176        }
1177    
1178        public QueueMessageReference getMessage(String id) {
1179            MessageId msgId = new MessageId(id);
1180            pagedInMessagesLock.readLock().lock();
1181            try {
1182                QueueMessageReference ref = this.pagedInMessages.get(msgId);
1183                if (ref != null) {
1184                    return ref;
1185                }
1186            } finally {
1187                pagedInMessagesLock.readLock().unlock();
1188            }
1189            messagesLock.readLock().lock();
1190            try{
1191                try {
1192                    messages.reset();
1193                    while (messages.hasNext()) {
1194                        MessageReference mr = messages.next();
1195                        QueueMessageReference qmr = createMessageReference(mr.getMessage());
1196                        qmr.decrementReferenceCount();
1197                        messages.rollback(qmr.getMessageId());
1198                        if (msgId.equals(qmr.getMessageId())) {
1199                            return qmr;
1200                        }
1201                    }
1202                } finally {
1203                    messages.release();
1204                }
1205            }finally {
1206                messagesLock.readLock().unlock();
1207            }
1208            return null;
1209        }
1210    
1211        public void purge() throws Exception {
1212            ConnectionContext c = createConnectionContext();
1213            List<MessageReference> list = null;
1214            do {
1215                doPageIn(true, false);  // signal no expiry processing needed.
1216                pagedInMessagesLock.readLock().lock();
1217                try {
1218                    list = new ArrayList<MessageReference>(pagedInMessages.values());
1219                }finally {
1220                    pagedInMessagesLock.readLock().unlock();
1221                }
1222    
1223                for (MessageReference ref : list) {
1224                    try {
1225                        QueueMessageReference r = (QueueMessageReference) ref;
1226                        removeMessage(c, r);
1227                    } catch (IOException e) {
1228                    }
1229                }
1230                // don't spin/hang if stats are out and there is nothing left in the
1231                // store
1232            } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
1233    
1234            if (this.destinationStatistics.getMessages().getCount() > 0) {
1235                LOG.warn("{} after purge complete, message count stats report: {}", getActiveMQDestination().getQualifiedName(), this.destinationStatistics.getMessages().getCount());
1236            }
1237            gc();
1238            this.destinationStatistics.getMessages().setCount(0);
1239            getMessages().clear();
1240        }
1241    
1242        @Override
1243        public void clearPendingMessages() {
1244            messagesLock.writeLock().lock();
1245            try {
1246                if (store != null) {
1247                    store.resetBatching();
1248                }
1249                messages.gc();
1250                messages.reset();
1251                asyncWakeup();
1252            } finally {
1253                messagesLock.writeLock().unlock();
1254            }
1255        }
1256    
1257        /**
1258         * Removes the message matching the given messageId
1259         */
1260        public boolean removeMessage(String messageId) throws Exception {
1261            return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
1262        }
1263    
1264        /**
1265         * Removes the messages matching the given selector
1266         *
1267         * @return the number of messages removed
1268         */
1269        public int removeMatchingMessages(String selector) throws Exception {
1270            return removeMatchingMessages(selector, -1);
1271        }
1272    
1273        /**
1274         * Removes the messages matching the given selector up to the maximum number
1275         * of matched messages
1276         *
1277         * @return the number of messages removed
1278         */
1279        public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
1280            return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
1281        }
1282    
1283        /**
1284         * Removes the messages matching the given filter up to the maximum number
1285         * of matched messages
1286         *
1287         * @return the number of messages removed
1288         */
1289        public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
1290            int movedCounter = 0;
1291            Set<MessageReference> set = new LinkedHashSet<MessageReference>();
1292            ConnectionContext context = createConnectionContext();
1293            do {
1294                doPageIn(true);
1295                pagedInMessagesLock.readLock().lock();
1296                try {
1297                    set.addAll(pagedInMessages.values());
1298                } finally {
1299                    pagedInMessagesLock.readLock().unlock();
1300                }
1301                List<MessageReference> list = new ArrayList<MessageReference>(set);
1302                for (MessageReference ref : list) {
1303                    IndirectMessageReference r = (IndirectMessageReference) ref;
1304                    if (filter.evaluate(context, r)) {
1305    
1306                        removeMessage(context, r);
1307                        set.remove(r);
1308                        if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1309                            return movedCounter;
1310                        }
1311                    }
1312                }
1313            } while (set.size() < this.destinationStatistics.getMessages().getCount());
1314            return movedCounter;
1315        }
1316    
1317        /**
1318         * Copies the message matching the given messageId
1319         */
1320        public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1321                throws Exception {
1322            return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
1323        }
1324    
1325        /**
1326         * Copies the messages matching the given selector
1327         *
1328         * @return the number of messages copied
1329         */
1330        public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1331                throws Exception {
1332            return copyMatchingMessagesTo(context, selector, dest, -1);
1333        }
1334    
1335        /**
1336         * Copies the messages matching the given selector up to the maximum number
1337         * of matched messages
1338         *
1339         * @return the number of messages copied
1340         */
1341        public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1342                int maximumMessages) throws Exception {
1343            return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
1344        }
1345    
1346        /**
1347         * Copies the messages matching the given filter up to the maximum number of
1348         * matched messages
1349         *
1350         * @return the number of messages copied
1351         */
1352        public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
1353                int maximumMessages) throws Exception {
1354            int movedCounter = 0;
1355            int count = 0;
1356            Set<MessageReference> set = new LinkedHashSet<MessageReference>();
1357            do {
1358                int oldMaxSize = getMaxPageSize();
1359                setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
1360                doPageIn(true);
1361                setMaxPageSize(oldMaxSize);
1362                pagedInMessagesLock.readLock().lock();
1363                try {
1364                    set.addAll(pagedInMessages.values());
1365                } finally {
1366                    pagedInMessagesLock.readLock().unlock();
1367                }
1368                List<MessageReference> list = new ArrayList<MessageReference>(set);
1369                for (MessageReference ref : list) {
1370                    IndirectMessageReference r = (IndirectMessageReference) ref;
1371                    if (filter.evaluate(context, r)) {
1372    
1373                        r.incrementReferenceCount();
1374                        try {
1375                            Message m = r.getMessage();
1376                            BrokerSupport.resend(context, m, dest);
1377                            if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1378                                return movedCounter;
1379                            }
1380                        } finally {
1381                            r.decrementReferenceCount();
1382                        }
1383                    }
1384                    count++;
1385                }
1386            } while (count < this.destinationStatistics.getMessages().getCount());
1387            return movedCounter;
1388        }
1389    
1390        /**
1391         * Move a message
1392         *
1393         * @param context
1394         *            connection context
1395         * @param m
1396         *            QueueMessageReference
1397         * @param dest
1398         *            ActiveMQDestination
1399         * @throws Exception
1400         */
1401        public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
1402            BrokerSupport.resend(context, m.getMessage(), dest);
1403            removeMessage(context, m);
1404            messagesLock.writeLock().lock();
1405            try {
1406                messages.rollback(m.getMessageId());
1407                if (isDLQ()) {
1408                    DeadLetterStrategy stratagy = getDeadLetterStrategy();
1409                    stratagy.rollback(m.getMessage());
1410                }
1411            } finally {
1412                messagesLock.writeLock().unlock();
1413            }
1414            return true;
1415        }
1416    
1417        /**
1418         * Moves the message matching the given messageId
1419         */
1420        public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1421                throws Exception {
1422            return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
1423        }
1424    
1425        /**
1426         * Moves the messages matching the given selector
1427         *
1428         * @return the number of messages removed
1429         */
1430        public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1431                throws Exception {
1432            return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
1433        }
1434    
1435        /**
1436         * Moves the messages matching the given selector up to the maximum number
1437         * of matched messages
1438         */
1439        public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1440                int maximumMessages) throws Exception {
1441            return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
1442        }
1443    
1444        /**
1445         * Moves the messages matching the given filter up to the maximum number of
1446         * matched messages
1447         */
1448        public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
1449                ActiveMQDestination dest, int maximumMessages) throws Exception {
1450            int movedCounter = 0;
1451            Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
1452            do {
1453                doPageIn(true);
1454                pagedInMessagesLock.readLock().lock();
1455                try {
1456                    set.addAll(pagedInMessages.values());
1457                } finally {
1458                    pagedInMessagesLock.readLock().unlock();
1459                }
1460                List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
1461                for (QueueMessageReference ref : list) {
1462                    if (filter.evaluate(context, ref)) {
1463                        // We should only move messages that can be locked.
1464                        moveMessageTo(context, ref, dest);
1465                        set.remove(ref);
1466                        if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1467                            return movedCounter;
1468                        }
1469                    }
1470                }
1471            } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
1472            return movedCounter;
1473        }
1474    
1475        public int retryMessages(ConnectionContext context, int maximumMessages) throws Exception {
1476            if (!isDLQ()) {
1477                throw new Exception("Retry of message is only possible on Dead Letter Queues!");
1478            }
1479            int restoredCounter = 0;
1480            Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
1481            do {
1482                doPageIn(true);
1483                pagedInMessagesLock.readLock().lock();
1484                try {
1485                    set.addAll(pagedInMessages.values());
1486                } finally {
1487                    pagedInMessagesLock.readLock().unlock();
1488                }
1489                List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
1490                for (QueueMessageReference ref : list) {
1491                    if (ref.getMessage().getOriginalDestination() != null) {
1492    
1493                        moveMessageTo(context, ref, ref.getMessage().getOriginalDestination());
1494                        set.remove(ref);
1495                        if (++restoredCounter >= maximumMessages && maximumMessages > 0) {
1496                            return restoredCounter;
1497                        }
1498                    }
1499                }
1500            } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
1501            return restoredCounter;
1502        }
1503    
1504        /**
1505         * @return true if we would like to iterate again
1506         * @see org.apache.activemq.thread.Task#iterate()
1507         */
1508        @Override
1509        public boolean iterate() {
1510            MDC.put("activemq.destination", getName());
1511            boolean pageInMoreMessages = false;
1512            synchronized (iteratingMutex) {
1513    
1514                // If optimize dispatch is on or this is a slave this method could be called recursively
1515                // we set this state value to short-circuit wakeup in those cases to avoid that as it
1516                // could lead to errors.
1517                iterationRunning = true;
1518    
1519                // do early to allow dispatch of these waiting messages
1520                synchronized (messagesWaitingForSpace) {
1521                    Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
1522                    while (it.hasNext()) {
1523                        if (!memoryUsage.isFull()) {
1524                            Runnable op = it.next();
1525                            it.remove();
1526                            op.run();
1527                        } else {
1528                            registerCallbackForNotFullNotification();
1529                            break;
1530                        }
1531                    }
1532                }
1533    
1534                if (firstConsumer) {
1535                    firstConsumer = false;
1536                    try {
1537                        if (consumersBeforeDispatchStarts > 0) {
1538                            int timeout = 1000; // wait one second by default if
1539                                                // consumer count isn't reached
1540                            if (timeBeforeDispatchStarts > 0) {
1541                                timeout = timeBeforeDispatchStarts;
1542                            }
1543                            if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
1544                                LOG.debug("{} consumers subscribed. Starting dispatch.", consumers.size());
1545                            } else {
1546                                LOG.debug("{} ms elapsed and {} consumers subscribed. Starting dispatch.", timeout, consumers.size());
1547                            }
1548                        }
1549                        if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
1550                            iteratingMutex.wait(timeBeforeDispatchStarts);
1551                            LOG.debug("{} ms elapsed. Starting dispatch.", timeBeforeDispatchStarts);
1552                        }
1553                    } catch (Exception e) {
1554                        LOG.error(e.toString());
1555                    }
1556                }
1557    
1558                messagesLock.readLock().lock();
1559                try{
1560                    pageInMoreMessages |= !messages.isEmpty();
1561                } finally {
1562                    messagesLock.readLock().unlock();
1563                }
1564    
1565                pagedInPendingDispatchLock.readLock().lock();
1566                try {
1567                    pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
1568                } finally {
1569                    pagedInPendingDispatchLock.readLock().unlock();
1570                }
1571    
1572                // Perhaps we should page always into the pagedInPendingDispatch
1573                // list if
1574                // !messages.isEmpty(), and then if
1575                // !pagedInPendingDispatch.isEmpty()
1576                // then we do a dispatch.
1577                boolean hasBrowsers = browserDispatches.size() > 0;
1578    
1579                if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) {
1580                    try {
1581                        pageInMessages(hasBrowsers);
1582                    } catch (Throwable e) {
1583                        LOG.error("Failed to page in more queue messages ", e);
1584                    }
1585                }
1586    
1587                if (hasBrowsers) {
1588                    ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
1589                    pagedInMessagesLock.readLock().lock();
1590                    try{
1591                        alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
1592                    }finally {
1593                        pagedInMessagesLock.readLock().unlock();
1594                    }
1595    
1596                    Iterator<BrowserDispatch> browsers = browserDispatches.iterator();
1597                    while (browsers.hasNext()) {
1598                        BrowserDispatch browserDispatch = browsers.next();
1599                        try {
1600                            MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
1601                            msgContext.setDestination(destination);
1602    
1603                            QueueBrowserSubscription browser = browserDispatch.getBrowser();
1604    
1605                            LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size());
1606                            boolean added = false;
1607                            for (QueueMessageReference node : alreadyDispatchedMessages) {
1608                                if (!node.isAcked() && !browser.isDuplicate(node.getMessageId())) {
1609                                    msgContext.setMessageReference(node);
1610                                    if (browser.matches(node, msgContext)) {
1611                                        browser.add(node);
1612                                        added = true;
1613                                    }
1614                                }
1615                            }
1616                            // are we done browsing? no new messages paged
1617                            if (!added) {
1618                                browser.decrementQueueRef();
1619                                browserDispatches.remove(browserDispatch);
1620                            }
1621                        } catch (Exception e) {
1622                            LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e);
1623                        }
1624                    }
1625                }
1626    
1627                if (pendingWakeups.get() > 0) {
1628                    pendingWakeups.decrementAndGet();
1629                }
1630                MDC.remove("activemq.destination");
1631                iterationRunning = false;
1632    
1633                return pendingWakeups.get() > 0;
1634            }
1635        }
1636    
1637        protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
1638            return new MessageReferenceFilter() {
1639                @Override
1640                public boolean evaluate(ConnectionContext context, MessageReference r) {
1641                    return messageId.equals(r.getMessageId().toString());
1642                }
1643    
1644                @Override
1645                public String toString() {
1646                    return "MessageIdFilter: " + messageId;
1647                }
1648            };
1649        }
1650    
1651        protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException {
1652    
1653            if (selector == null || selector.isEmpty()) {
1654                return new MessageReferenceFilter() {
1655    
1656                    @Override
1657                    public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException {
1658                        return true;
1659                    }
1660                };
1661            }
1662    
1663            final BooleanExpression selectorExpression = SelectorParser.parse(selector);
1664    
1665            return new MessageReferenceFilter() {
1666                @Override
1667                public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException {
1668                    MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
1669    
1670                    messageEvaluationContext.setMessageReference(r);
1671                    if (messageEvaluationContext.getDestination() == null) {
1672                        messageEvaluationContext.setDestination(getActiveMQDestination());
1673                    }
1674    
1675                    return selectorExpression.matches(messageEvaluationContext);
1676                }
1677            };
1678        }
1679    
1680        protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
1681            removeMessage(c, null, r);
1682            pagedInPendingDispatchLock.writeLock().lock();
1683            try {
1684                pagedInPendingDispatch.remove(r);
1685            } finally {
1686                pagedInPendingDispatchLock.writeLock().unlock();
1687            }
1688        }
1689    
1690        protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
1691            MessageAck ack = new MessageAck();
1692            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
1693            ack.setDestination(destination);
1694            ack.setMessageID(r.getMessageId());
1695            removeMessage(c, subs, r, ack);
1696        }
1697    
1698        protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
1699                MessageAck ack) throws IOException {
1700            reference.setAcked(true);
1701            // This sends the ack the the journal..
1702            if (!ack.isInTransaction()) {
1703                acknowledge(context, sub, ack, reference);
1704                getDestinationStatistics().getDequeues().increment();
1705                dropMessage(reference);
1706            } else {
1707                try {
1708                    acknowledge(context, sub, ack, reference);
1709                } finally {
1710                    context.getTransaction().addSynchronization(new Synchronization() {
1711    
1712                        @Override
1713                        public void afterCommit() throws Exception {
1714                            getDestinationStatistics().getDequeues().increment();
1715                            dropMessage(reference);
1716                            wakeup();
1717                        }
1718    
1719                        @Override
1720                        public void afterRollback() throws Exception {
1721                            reference.setAcked(false);
1722                            wakeup();
1723                        }
1724                    });
1725                }
1726            }
1727            if (ack.isPoisonAck() || (sub != null && sub.getConsumerInfo().isNetworkSubscription())) {
1728                // message gone to DLQ, is ok to allow redelivery
1729                messagesLock.writeLock().lock();
1730                try {
1731                    messages.rollback(reference.getMessageId());
1732                } finally {
1733                    messagesLock.writeLock().unlock();
1734                }
1735            }
1736    
1737        }
1738    
1739        private void dropMessage(QueueMessageReference reference) {
1740            if (!reference.isDropped()) {
1741                reference.drop();
1742                destinationStatistics.getMessages().decrement();
1743                pagedInMessagesLock.writeLock().lock();
1744                try {
1745                    pagedInMessages.remove(reference.getMessageId());
1746                } finally {
1747                    pagedInMessagesLock.writeLock().unlock();
1748                }
1749            }
1750        }
1751    
1752        public void messageExpired(ConnectionContext context, MessageReference reference) {
1753            messageExpired(context, null, reference);
1754        }
1755    
1756        @Override
1757        public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
1758            LOG.debug("message expired: {}", reference);
1759            broker.messageExpired(context, reference, subs);
1760            destinationStatistics.getExpired().increment();
1761            try {
1762                removeMessage(context, subs, (QueueMessageReference) reference);
1763                messagesLock.writeLock().lock();
1764                try {
1765                    messages.rollback(reference.getMessageId());
1766                } finally {
1767                    messagesLock.writeLock().unlock();
1768                }
1769            } catch (IOException e) {
1770                LOG.error("Failed to remove expired Message from the store ", e);
1771            }
1772        }
1773    
1774        final void sendMessage(final Message msg) throws Exception {
1775            messagesLock.writeLock().lock();
1776            try {
1777                messages.addMessageLast(msg);
1778            } finally {
1779                messagesLock.writeLock().unlock();
1780            }
1781        }
1782    
1783        final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
1784            destinationStatistics.getEnqueues().increment();
1785            destinationStatistics.getMessages().increment();
1786            destinationStatistics.getMessageSize().addSize(msg.getSize());
1787            messageDelivered(context, msg);
1788            consumersLock.readLock().lock();
1789            try {
1790                if (consumers.isEmpty()) {
1791                    onMessageWithNoConsumers(context, msg);
1792                }
1793            }finally {
1794                consumersLock.readLock().unlock();
1795            }
1796            LOG.debug("{} Message {} sent to {}", new Object[]{ broker.getBrokerName(), msg.getMessageId(), this.destination });
1797            wakeup();
1798        }
1799    
1800        @Override
1801        public void wakeup() {
1802            if (optimizedDispatch && !iterationRunning) {
1803                iterate();
1804                pendingWakeups.incrementAndGet();
1805            } else {
1806                asyncWakeup();
1807            }
1808        }
1809    
1810        private void asyncWakeup() {
1811            try {
1812                pendingWakeups.incrementAndGet();
1813                this.taskRunner.wakeup();
1814            } catch (InterruptedException e) {
1815                LOG.warn("Async task runner failed to wakeup ", e);
1816            }
1817        }
1818    
1819        private void doPageIn(boolean force) throws Exception {
1820            doPageIn(force, true);
1821        }
1822    
1823        private void doPageIn(boolean force, boolean processExpired) throws Exception {
1824            PendingList newlyPaged = doPageInForDispatch(force, processExpired);
1825            pagedInPendingDispatchLock.writeLock().lock();
1826            try {
1827                if (pagedInPendingDispatch.isEmpty()) {
1828                    pagedInPendingDispatch.addAll(newlyPaged);
1829    
1830                } else {
1831                    for (MessageReference qmr : newlyPaged) {
1832                        if (!pagedInPendingDispatch.contains(qmr)) {
1833                            pagedInPendingDispatch.addMessageLast(qmr);
1834                        }
1835                    }
1836                }
1837            } finally {
1838                pagedInPendingDispatchLock.writeLock().unlock();
1839            }
1840        }
1841    
1842        private PendingList doPageInForDispatch(boolean force, boolean processExpired) throws Exception {
1843            List<QueueMessageReference> result = null;
1844            PendingList resultList = null;
1845    
1846            int toPageIn = Math.min(getMaxPageSize(), messages.size());
1847            LOG.debug("{} toPageIn: {}, Inflight: {}, pagedInMessages.size {}, enqueueCount: {}, dequeueCount: {}",
1848                    new Object[]{
1849                            destination.getPhysicalName(),
1850                            toPageIn,
1851                            destinationStatistics.getInflight().getCount(),
1852                            pagedInMessages.size(),
1853                            destinationStatistics.getEnqueues().getCount(),
1854                            destinationStatistics.getDequeues().getCount()
1855                    });
1856            if (isLazyDispatch() && !force) {
1857                // Only page in the minimum number of messages which can be
1858                // dispatched immediately.
1859                toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
1860            }
1861            int pagedInPendingSize = 0;
1862            pagedInPendingDispatchLock.readLock().lock();
1863            try {
1864                pagedInPendingSize = pagedInPendingDispatch.size();
1865            } finally {
1866                pagedInPendingDispatchLock.readLock().unlock();
1867            }
1868            if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
1869                int count = 0;
1870                result = new ArrayList<QueueMessageReference>(toPageIn);
1871                messagesLock.writeLock().lock();
1872                try {
1873                    try {
1874                        messages.setMaxBatchSize(toPageIn);
1875                        messages.reset();
1876                        while (messages.hasNext() && count < toPageIn) {
1877                            MessageReference node = messages.next();
1878                            messages.remove();
1879    
1880                            QueueMessageReference ref = createMessageReference(node.getMessage());
1881                            if (processExpired && ref.isExpired()) {
1882                                if (broker.isExpired(ref)) {
1883                                    messageExpired(createConnectionContext(), ref);
1884                                } else {
1885                                    ref.decrementReferenceCount();
1886                                }
1887                            } else {
1888                                result.add(ref);
1889                                count++;
1890                            }
1891                        }
1892                    } finally {
1893                        messages.release();
1894                    }
1895                } finally {
1896                    messagesLock.writeLock().unlock();
1897                }
1898                // Only add new messages, not already pagedIn to avoid multiple
1899                // dispatch attempts
1900                pagedInMessagesLock.writeLock().lock();
1901                try {
1902                    if(isPrioritizedMessages()) {
1903                        resultList = new PrioritizedPendingList();
1904                    } else {
1905                        resultList = new OrderedPendingList();
1906                    }
1907                    for (QueueMessageReference ref : result) {
1908                        if (!pagedInMessages.containsKey(ref.getMessageId())) {
1909                            pagedInMessages.put(ref.getMessageId(), ref);
1910                            resultList.addMessageLast(ref);
1911                        } else {
1912                            ref.decrementReferenceCount();
1913                        }
1914                    }
1915                } finally {
1916                    pagedInMessagesLock.writeLock().unlock();
1917                }
1918            } else {
1919                // Avoid return null list, if condition is not validated
1920                resultList = new OrderedPendingList();
1921            }
1922    
1923            return resultList;
1924        }
1925    
1926        private void doDispatch(PendingList list) throws Exception {
1927            boolean doWakeUp = false;
1928    
1929            pagedInPendingDispatchLock.writeLock().lock();
1930            try {
1931                if (!redeliveredWaitingDispatch.isEmpty()) {
1932                    // Try first to dispatch redelivered messages to keep an
1933                    // proper order
1934                    redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
1935                }
1936                if (!pagedInPendingDispatch.isEmpty()) {
1937                    // Next dispatch anything that had not been
1938                    // dispatched before.
1939                    pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
1940                }
1941                // and now see if we can dispatch the new stuff.. and append to
1942                // the pending
1943                // list anything that does not actually get dispatched.
1944                if (list != null && !list.isEmpty()) {
1945                    if (pagedInPendingDispatch.isEmpty()) {
1946                        pagedInPendingDispatch.addAll(doActualDispatch(list));
1947                    } else {
1948                        for (MessageReference qmr : list) {
1949                            if (!pagedInPendingDispatch.contains(qmr)) {
1950                                pagedInPendingDispatch.addMessageLast(qmr);
1951                            }
1952                        }
1953                        doWakeUp = true;
1954                    }
1955                }
1956            } finally {
1957                pagedInPendingDispatchLock.writeLock().unlock();
1958            }
1959    
1960            if (doWakeUp) {
1961                // avoid lock order contention
1962                asyncWakeup();
1963            }
1964        }
1965    
1966        /**
1967         * @return list of messages that could get dispatched to consumers if they
1968         *         were not full.
1969         */
1970        private PendingList doActualDispatch(PendingList list) throws Exception {
1971            List<Subscription> consumers;
1972            consumersLock.writeLock().lock();
1973    
1974            try {
1975                if (this.consumers.isEmpty()) {
1976                    // slave dispatch happens in processDispatchNotification
1977                    return list;
1978                }
1979                consumers = new ArrayList<Subscription>(this.consumers);
1980            } finally {
1981                consumersLock.writeLock().unlock();
1982            }
1983    
1984            Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
1985    
1986            for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
1987    
1988                MessageReference node = iterator.next();
1989                Subscription target = null;
1990                for (Subscription s : consumers) {
1991                    if (s instanceof QueueBrowserSubscription) {
1992                        continue;
1993                    }
1994                    if (!fullConsumers.contains(s)) {
1995                        if (!s.isFull()) {
1996                            if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
1997                                // Dispatch it.
1998                                s.add(node);
1999                                iterator.remove();
2000                                target = s;
2001                                break;
2002                            }
2003                        } else {
2004                            // no further dispatch of list to a full consumer to
2005                            // avoid out of order message receipt
2006                            fullConsumers.add(s);
2007                            LOG.trace("Subscription full {}", s);
2008                        }
2009                    }
2010                }
2011    
2012                if (target == null && node.isDropped()) {
2013                    iterator.remove();
2014                }
2015    
2016                // return if there are no consumers or all consumers are full
2017                if (target == null && consumers.size() == fullConsumers.size()) {
2018                    return list;
2019                }
2020    
2021                // If it got dispatched, rotate the consumer list to get round robin
2022                // distribution.
2023                if (target != null && !strictOrderDispatch && consumers.size() > 1
2024                        && !dispatchSelector.isExclusiveConsumer(target)) {
2025                    consumersLock.writeLock().lock();
2026                    try {
2027                        if (removeFromConsumerList(target)) {
2028                            addToConsumerList(target);
2029                            consumers = new ArrayList<Subscription>(this.consumers);
2030                        }
2031                    } finally {
2032                        consumersLock.writeLock().unlock();
2033                    }
2034                }
2035            }
2036    
2037            return list;
2038        }
2039    
2040        protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
2041            boolean result = true;
2042            // Keep message groups together.
2043            String groupId = node.getGroupID();
2044            int sequence = node.getGroupSequence();
2045            if (groupId != null) {
2046    
2047                MessageGroupMap messageGroupOwners = getMessageGroupOwners();
2048                // If we can own the first, then no-one else should own the
2049                // rest.
2050                if (sequence == 1) {
2051                    assignGroup(subscription, messageGroupOwners, node, groupId);
2052                } else {
2053    
2054                    // Make sure that the previous owner is still valid, we may
2055                    // need to become the new owner.
2056                    ConsumerId groupOwner;
2057    
2058                    groupOwner = messageGroupOwners.get(groupId);
2059                    if (groupOwner == null) {
2060                        assignGroup(subscription, messageGroupOwners, node, groupId);
2061                    } else {
2062                        if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
2063                            // A group sequence < 1 is an end of group signal.
2064                            if (sequence < 0) {
2065                                messageGroupOwners.removeGroup(groupId);
2066                                subscription.getConsumerInfo().setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1);
2067                            }
2068                        } else {
2069                            result = false;
2070                        }
2071                    }
2072                }
2073            }
2074    
2075            return result;
2076        }
2077    
2078        protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
2079            messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
2080            Message message = n.getMessage();
2081            message.setJMSXGroupFirstForConsumer(true);
2082            subs.getConsumerInfo().setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1);
2083        }
2084    
2085        protected void pageInMessages(boolean force) throws Exception {
2086            doDispatch(doPageInForDispatch(force, true));
2087        }
2088    
2089        private void addToConsumerList(Subscription sub) {
2090            if (useConsumerPriority) {
2091                consumers.add(sub);
2092                Collections.sort(consumers, orderedCompare);
2093            } else {
2094                consumers.add(sub);
2095            }
2096        }
2097    
2098        private boolean removeFromConsumerList(Subscription sub) {
2099            return consumers.remove(sub);
2100        }
2101    
2102        private int getConsumerMessageCountBeforeFull() throws Exception {
2103            int total = 0;
2104            boolean zeroPrefetch = false;
2105            consumersLock.readLock().lock();
2106            try {
2107                for (Subscription s : consumers) {
2108                    zeroPrefetch |= s.getPrefetchSize() == 0;
2109                    int countBeforeFull = s.countBeforeFull();
2110                    total += countBeforeFull;
2111                }
2112            } finally {
2113                consumersLock.readLock().unlock();
2114            }
2115            if (total == 0 && zeroPrefetch) {
2116                total = 1;
2117            }
2118            return total;
2119        }
2120    
2121        /*
2122         * In slave mode, dispatch is ignored till we get this notification as the
2123         * dispatch process is non deterministic between master and slave. On a
2124         * notification, the actual dispatch to the subscription (as chosen by the
2125         * master) is completed. (non-Javadoc)
2126         * @see
2127         * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
2128         * (org.apache.activemq.command.MessageDispatchNotification)
2129         */
2130        @Override
2131        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
2132            // do dispatch
2133            Subscription sub = getMatchingSubscription(messageDispatchNotification);
2134            if (sub != null) {
2135                MessageReference message = getMatchingMessage(messageDispatchNotification);
2136                sub.add(message);
2137                sub.processMessageDispatchNotification(messageDispatchNotification);
2138            }
2139        }
2140    
2141        private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
2142                throws Exception {
2143            QueueMessageReference message = null;
2144            MessageId messageId = messageDispatchNotification.getMessageId();
2145    
2146            pagedInPendingDispatchLock.writeLock().lock();
2147            try {
2148                for (MessageReference ref : pagedInPendingDispatch) {
2149                    if (messageId.equals(ref.getMessageId())) {
2150                        message = (QueueMessageReference)ref;
2151                        pagedInPendingDispatch.remove(ref);
2152                        break;
2153                    }
2154                }
2155            } finally {
2156                pagedInPendingDispatchLock.writeLock().unlock();
2157            }
2158    
2159            if (message == null) {
2160                pagedInMessagesLock.readLock().lock();
2161                try {
2162                    message = pagedInMessages.get(messageId);
2163                } finally {
2164                    pagedInMessagesLock.readLock().unlock();
2165                }
2166            }
2167    
2168            if (message == null) {
2169                messagesLock.writeLock().lock();
2170                try {
2171                    try {
2172                        messages.setMaxBatchSize(getMaxPageSize());
2173                        messages.reset();
2174                        while (messages.hasNext()) {
2175                            MessageReference node = messages.next();
2176                            messages.remove();
2177                            if (messageId.equals(node.getMessageId())) {
2178                                message = this.createMessageReference(node.getMessage());
2179                                break;
2180                            }
2181                        }
2182                    } finally {
2183                        messages.release();
2184                    }
2185                } finally {
2186                    messagesLock.writeLock().unlock();
2187                }
2188            }
2189    
2190            if (message == null) {
2191                Message msg = loadMessage(messageId);
2192                if (msg != null) {
2193                    message = this.createMessageReference(msg);
2194                }
2195            }
2196    
2197            if (message == null) {
2198                throw new JMSException("Slave broker out of sync with master - Message: "
2199                        + messageDispatchNotification.getMessageId() + " on "
2200                        + messageDispatchNotification.getDestination() + " does not exist among pending("
2201                        + pagedInPendingDispatch.size() + ") for subscription: "
2202                        + messageDispatchNotification.getConsumerId());
2203            }
2204            return message;
2205        }
2206    
2207        /**
2208         * Find a consumer that matches the id in the message dispatch notification
2209         *
2210         * @param messageDispatchNotification
2211         * @return sub or null if the subscription has been removed before dispatch
2212         * @throws JMSException
2213         */
2214        private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
2215                throws JMSException {
2216            Subscription sub = null;
2217            consumersLock.readLock().lock();
2218            try {
2219                for (Subscription s : consumers) {
2220                    if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
2221                        sub = s;
2222                        break;
2223                    }
2224                }
2225            } finally {
2226                consumersLock.readLock().unlock();
2227            }
2228            return sub;
2229        }
2230    
2231        @Override
2232        public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) {
2233            if (oldPercentUsage > newPercentUsage) {
2234                asyncWakeup();
2235            }
2236        }
2237    
2238        @Override
2239        protected Logger getLog() {
2240            return LOG;
2241        }
2242    
2243        protected boolean isOptimizeStorage(){
2244            boolean result = false;
2245            if (isDoOptimzeMessageStorage()){
2246                consumersLock.readLock().lock();
2247                try{
2248                    if (consumers.isEmpty()==false){
2249                        result = true;
2250                        for (Subscription s : consumers) {
2251                            if (s.getPrefetchSize()==0){
2252                                result = false;
2253                                break;
2254                            }
2255                            if (s.isSlowConsumer()){
2256                                result = false;
2257                                break;
2258                            }
2259                            if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
2260                                result = false;
2261                                break;
2262                            }
2263                        }
2264                    }
2265                } finally {
2266                    consumersLock.readLock().unlock();
2267                }
2268            }
2269            return result;
2270        }
2271    }