001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.Comparator;
024    import java.util.HashSet;
025    import java.util.Iterator;
026    import java.util.LinkedHashMap;
027    import java.util.LinkedHashSet;
028    import java.util.LinkedList;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    import java.util.concurrent.CancellationException;
033    import java.util.concurrent.ConcurrentLinkedQueue;
034    import java.util.concurrent.CountDownLatch;
035    import java.util.concurrent.DelayQueue;
036    import java.util.concurrent.Delayed;
037    import java.util.concurrent.ExecutorService;
038    import java.util.concurrent.Future;
039    import java.util.concurrent.TimeUnit;
040    import java.util.concurrent.atomic.AtomicLong;
041    import java.util.concurrent.locks.Lock;
042    import java.util.concurrent.locks.ReentrantLock;
043    import java.util.concurrent.locks.ReentrantReadWriteLock;
044    
045    import javax.jms.InvalidSelectorException;
046    import javax.jms.JMSException;
047    import javax.jms.ResourceAllocationException;
048    
049    import org.apache.activemq.broker.BrokerService;
050    import org.apache.activemq.broker.ConnectionContext;
051    import org.apache.activemq.broker.ProducerBrokerExchange;
052    import org.apache.activemq.broker.region.cursors.OrderedPendingList;
053    import org.apache.activemq.broker.region.cursors.PendingList;
054    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
055    import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
056    import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
057    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
058    import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
059    import org.apache.activemq.broker.region.group.MessageGroupMap;
060    import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
061    import org.apache.activemq.broker.region.policy.DispatchPolicy;
062    import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
063    import org.apache.activemq.broker.util.InsertionCountList;
064    import org.apache.activemq.command.ActiveMQDestination;
065    import org.apache.activemq.command.ActiveMQMessage;
066    import org.apache.activemq.command.ConsumerId;
067    import org.apache.activemq.command.ExceptionResponse;
068    import org.apache.activemq.command.Message;
069    import org.apache.activemq.command.MessageAck;
070    import org.apache.activemq.command.MessageDispatchNotification;
071    import org.apache.activemq.command.MessageId;
072    import org.apache.activemq.command.ProducerAck;
073    import org.apache.activemq.command.ProducerInfo;
074    import org.apache.activemq.command.Response;
075    import org.apache.activemq.filter.BooleanExpression;
076    import org.apache.activemq.filter.MessageEvaluationContext;
077    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
078    import org.apache.activemq.selector.SelectorParser;
079    import org.apache.activemq.state.ProducerState;
080    import org.apache.activemq.store.MessageRecoveryListener;
081    import org.apache.activemq.store.MessageStore;
082    import org.apache.activemq.thread.Task;
083    import org.apache.activemq.thread.TaskRunner;
084    import org.apache.activemq.thread.TaskRunnerFactory;
085    import org.apache.activemq.transaction.Synchronization;
086    import org.apache.activemq.usage.Usage;
087    import org.apache.activemq.usage.UsageListener;
088    import org.apache.activemq.util.BrokerSupport;
089    import org.apache.activemq.util.ThreadPoolUtils;
090    import org.slf4j.Logger;
091    import org.slf4j.LoggerFactory;
092    import org.slf4j.MDC;
093    
094    /**
095     * The Queue is a List of MessageEntry objects that are dispatched to matching
096     * subscriptions.
097     */
098    public class Queue extends BaseDestination implements Task, UsageListener {
099        protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
100        protected final TaskRunnerFactory taskFactory;
101        protected TaskRunner taskRunner;
102        private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
103        protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
104        private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
105        protected PendingMessageCursor messages;
106        private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
107        private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
108        // Messages that are paged in but have not yet been targeted at a
109        // subscription
110        private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
111        protected PendingList pagedInPendingDispatch = new OrderedPendingList();
112        protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
113        private MessageGroupMap messageGroupOwners;
114        private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
115        private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
116        final Lock sendLock = new ReentrantLock();
117        private ExecutorService executor;
118        private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>();
119        private boolean useConsumerPriority = true;
120        private boolean strictOrderDispatch = false;
121        private final QueueDispatchSelector dispatchSelector;
122        private boolean optimizedDispatch = false;
123        private boolean iterationRunning = false;
124        private boolean firstConsumer = false;
125        private int timeBeforeDispatchStarts = 0;
126        private int consumersBeforeDispatchStarts = 0;
127        private CountDownLatch consumersBeforeStartsLatch;
128        private final AtomicLong pendingWakeups = new AtomicLong();
129        private boolean allConsumersExclusiveByDefault = false;
130    
131        private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
132            public void run() {
133                asyncWakeup();
134            }
135        };
136        private final Runnable expireMessagesTask = new Runnable() {
137            public void run() {
138                expireMessages();
139            }
140        };
141    
142        private final Object iteratingMutex = new Object();
143    
144        class TimeoutMessage implements Delayed {
145    
146            Message message;
147            ConnectionContext context;
148            long trigger;
149    
150            public TimeoutMessage(Message message, ConnectionContext context, long delay) {
151                this.message = message;
152                this.context = context;
153                this.trigger = System.currentTimeMillis() + delay;
154            }
155    
156            public long getDelay(TimeUnit unit) {
157                long n = trigger - System.currentTimeMillis();
158                return unit.convert(n, TimeUnit.MILLISECONDS);
159            }
160    
161            public int compareTo(Delayed delayed) {
162                long other = ((TimeoutMessage) delayed).trigger;
163                int returnValue;
164                if (this.trigger < other) {
165                    returnValue = -1;
166                } else if (this.trigger > other) {
167                    returnValue = 1;
168                } else {
169                    returnValue = 0;
170                }
171                return returnValue;
172            }
173    
174        }
175    
176        DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
177    
178        class FlowControlTimeoutTask extends Thread {
179    
180            @Override
181            public void run() {
182                TimeoutMessage timeout;
183                try {
184                    while (true) {
185                        timeout = flowControlTimeoutMessages.take();
186                        if (timeout != null) {
187                            synchronized (messagesWaitingForSpace) {
188                                if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
189                                    ExceptionResponse response = new ExceptionResponse(
190                                            new ResourceAllocationException(
191                                                    "Usage Manager Memory Limit reached. Stopping producer ("
192                                                            + timeout.message.getProducerId()
193                                                            + ") to prevent flooding "
194                                                            + getActiveMQDestination().getQualifiedName()
195                                                            + "."
196                                                            + " See http://activemq.apache.org/producer-flow-control.html for more info"));
197                                    response.setCorrelationId(timeout.message.getCommandId());
198                                    timeout.context.getConnection().dispatchAsync(response);
199                                }
200                            }
201                        }
202                    }
203                } catch (InterruptedException e) {
204                    if (LOG.isDebugEnabled()) {
205                        LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping");
206                    }
207                }
208            }
209        };
210    
211        private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
212    
213        private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
214    
215            public int compare(Subscription s1, Subscription s2) {
216                // We want the list sorted in descending order
217                int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
218                if (val == 0 && messageGroupOwners != null) {
219                    // then ascending order of assigned message groups to favour less loaded consumers
220                    // Long.compare in jdk7
221                    long x = s1.getConsumerInfo().getLastDeliveredSequenceId();
222                    long y = s2.getConsumerInfo().getLastDeliveredSequenceId();
223                    val = (x < y) ? -1 : ((x == y) ? 0 : 1);
224                }
225                return val;
226            }
227        };
228    
229        public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
230                DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
231            super(brokerService, store, destination, parentStats);
232            this.taskFactory = taskFactory;
233            this.dispatchSelector = new QueueDispatchSelector(destination);
234        }
235    
236        public List<Subscription> getConsumers() {
237            consumersLock.readLock().lock();
238            try {
239                return new ArrayList<Subscription>(consumers);
240            }finally {
241                consumersLock.readLock().unlock();
242            }
243        }
244    
245        // make the queue easily visible in the debugger from its task runner
246        // threads
247        final class QueueThread extends Thread {
248            final Queue queue;
249    
250            public QueueThread(Runnable runnable, String name, Queue queue) {
251                super(runnable, name);
252                this.queue = queue;
253            }
254        }
255    
256        class BatchMessageRecoveryListener implements MessageRecoveryListener {
257            final LinkedList<Message> toExpire = new LinkedList<Message>();
258            final double totalMessageCount;
259            int recoveredAccumulator = 0;
260            int currentBatchCount;
261    
262            BatchMessageRecoveryListener(int totalMessageCount) {
263                this.totalMessageCount = totalMessageCount;
264                currentBatchCount = recoveredAccumulator;
265            }
266    
267            public boolean recoverMessage(Message message) {
268                recoveredAccumulator++;
269                if (LOG.isInfoEnabled() && (recoveredAccumulator % 10000) == 0) {
270                    LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + " has recovered "
271                            + recoveredAccumulator + " messages. " +
272                            (int) (recoveredAccumulator * 100 / totalMessageCount) + "% complete");
273                }
274                // Message could have expired while it was being
275                // loaded..
276                if (message.isExpired() && broker.isExpired(message)) {
277                    toExpire.add(message);
278                    return true;
279                }
280                if (hasSpace()) {
281                    message.setRegionDestination(Queue.this);
282                    messagesLock.writeLock().lock();
283                    try {
284                        try {
285                            messages.addMessageLast(message);
286                        } catch (Exception e) {
287                            LOG.error("Failed to add message to cursor", e);
288                        }
289                    } finally {
290                        messagesLock.writeLock().unlock();
291                    }
292                    destinationStatistics.getMessages().increment();
293                    return true;
294                }
295                return false;
296            }
297    
298            public boolean recoverMessageReference(MessageId messageReference) throws Exception {
299                throw new RuntimeException("Should not be called.");
300            }
301    
302            public boolean hasSpace() {
303                return true;
304            }
305    
306            public boolean isDuplicate(MessageId id) {
307                return false;
308            }
309    
310            public void reset() {
311                currentBatchCount = recoveredAccumulator;
312            }
313    
314            public void processExpired() {
315                for (Message message: toExpire) {
316                    messageExpired(createConnectionContext(), createMessageReference(message));
317                    // drop message will decrement so counter
318                    // balance here
319                    destinationStatistics.getMessages().increment();
320                }
321                toExpire.clear();
322            }
323    
324            public boolean done() {
325                return currentBatchCount == recoveredAccumulator;
326            }
327        }
328    
329        @Override
330        public void setPrioritizedMessages(boolean prioritizedMessages) {
331            super.setPrioritizedMessages(prioritizedMessages);
332    
333            if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
334                pagedInPendingDispatch = new PrioritizedPendingList();
335                redeliveredWaitingDispatch = new PrioritizedPendingList();
336            } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
337                pagedInPendingDispatch = new OrderedPendingList();
338                redeliveredWaitingDispatch = new OrderedPendingList();
339            }
340        }
341    
342        @Override
343        public void initialize() throws Exception {
344    
345            if (this.messages == null) {
346                if (destination.isTemporary() || broker == null || store == null) {
347                    this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
348                } else {
349                    this.messages = new StoreQueueCursor(broker, this);
350                }
351            }
352    
353            // If a VMPendingMessageCursor don't use the default Producer System
354            // Usage
355            // since it turns into a shared blocking queue which can lead to a
356            // network deadlock.
357            // If we are cursoring to disk..it's not and issue because it does not
358            // block due
359            // to large disk sizes.
360            if (messages instanceof VMPendingMessageCursor) {
361                this.systemUsage = brokerService.getSystemUsage();
362                memoryUsage.setParent(systemUsage.getMemoryUsage());
363            }
364    
365            this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
366    
367            super.initialize();
368            if (store != null) {
369                // Restore the persistent messages.
370                messages.setSystemUsage(systemUsage);
371                messages.setEnableAudit(isEnableAudit());
372                messages.setMaxAuditDepth(getMaxAuditDepth());
373                messages.setMaxProducersToAudit(getMaxProducersToAudit());
374                messages.setUseCache(isUseCache());
375                messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
376                final int messageCount = store.getMessageCount();
377                if (messageCount > 0 && messages.isRecoveryRequired()) {
378                    BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
379                    do {
380                       listener.reset();
381                       store.recoverNextMessages(getMaxPageSize(), listener);
382                       listener.processExpired();
383                   } while (!listener.done());
384                } else {
385                    destinationStatistics.getMessages().setCount(messageCount);
386                }
387            }
388        }
389    
390        /*
391         * Holder for subscription that needs attention on next iterate browser
392         * needs access to existing messages in the queue that have already been
393         * dispatched
394         */
395        class BrowserDispatch {
396            QueueBrowserSubscription browser;
397    
398            public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
399                browser = browserSubscription;
400                browser.incrementQueueRef();
401            }
402    
403            void done() {
404                try {
405                    browser.decrementQueueRef();
406                } catch (Exception e) {
407                    LOG.warn("decrement ref on browser: " + browser, e);
408                }
409            }
410    
411            public QueueBrowserSubscription getBrowser() {
412                return browser;
413            }
414        }
415    
416        ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
417    
418        public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
419            if (LOG.isDebugEnabled()) {
420                LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: "
421                        + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
422                        + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
423                        + getDestinationStatistics().getInflight().getCount());
424            }
425    
426            super.addSubscription(context, sub);
427            // synchronize with dispatch method so that no new messages are sent
428            // while setting up a subscription. avoid out of order messages,
429            // duplicates, etc.
430            pagedInPendingDispatchLock.writeLock().lock();
431            try {
432    
433                sub.add(context, this);
434    
435                // needs to be synchronized - so no contention with dispatching
436               // consumersLock.
437                consumersLock.writeLock().lock();
438                try {
439    
440                    // set a flag if this is a first consumer
441                    if (consumers.size() == 0) {
442                        firstConsumer = true;
443                        if (consumersBeforeDispatchStarts != 0) {
444                            consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
445                        }
446                    } else {
447                        if (consumersBeforeStartsLatch != null) {
448                            consumersBeforeStartsLatch.countDown();
449                        }
450                    }
451    
452                    addToConsumerList(sub);
453                    if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
454                        Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
455                        if (exclusiveConsumer == null) {
456                            exclusiveConsumer = sub;
457                        } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE ||
458                            sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
459                            exclusiveConsumer = sub;
460                        }
461                        dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
462                    }
463                }finally {
464                    consumersLock.writeLock().unlock();
465                }
466    
467                if (sub instanceof QueueBrowserSubscription) {
468                    // tee up for dispatch in next iterate
469                    QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
470                    BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
471                    browserDispatches.add(browserDispatch);
472                }
473    
474                if (!(this.optimizedDispatch || isSlave())) {
475                    wakeup();
476                }
477            }finally {
478                pagedInPendingDispatchLock.writeLock().unlock();
479            }
480            if (this.optimizedDispatch || isSlave()) {
481                // Outside of dispatchLock() to maintain the lock hierarchy of
482                // iteratingMutex -> dispatchLock. - see
483                // https://issues.apache.org/activemq/browse/AMQ-1878
484                wakeup();
485            }
486        }
487    
488        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
489                throws Exception {
490            super.removeSubscription(context, sub, lastDeiveredSequenceId);
491            // synchronize with dispatch method so that no new messages are sent
492            // while removing up a subscription.
493            pagedInPendingDispatchLock.writeLock().lock();
494            try {
495                if (LOG.isDebugEnabled()) {
496                    LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
497                            + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
498                            + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
499                            + getDestinationStatistics().getInflight().getCount());
500                }
501                consumersLock.writeLock().lock();
502                try {
503                    removeFromConsumerList(sub);
504                    if (sub.getConsumerInfo().isExclusive()) {
505                        Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
506                        if (exclusiveConsumer == sub) {
507                            exclusiveConsumer = null;
508                            for (Subscription s : consumers) {
509                                if (s.getConsumerInfo().isExclusive()
510                                        && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
511                                                .getConsumerInfo().getPriority())) {
512                                    exclusiveConsumer = s;
513    
514                                }
515                            }
516                            dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
517                        }
518                    } else if (isAllConsumersExclusiveByDefault()) {
519                        Subscription exclusiveConsumer = null;
520                        for (Subscription s : consumers) {
521                            if (exclusiveConsumer == null
522                                    || s.getConsumerInfo().getPriority() > exclusiveConsumer
523                                    .getConsumerInfo().getPriority()) {
524                                exclusiveConsumer = s;
525                                    }
526                        }
527                        dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
528                    }
529                    ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
530                    getMessageGroupOwners().removeConsumer(consumerId);
531    
532                    // redeliver inflight messages
533    
534                    boolean markAsRedelivered = false;
535                    MessageReference lastDeliveredRef = null;
536                    List<MessageReference> unAckedMessages = sub.remove(context, this);
537    
538                    // locate last redelivered in unconsumed list (list in delivery rather than seq order)
539                    if (lastDeiveredSequenceId != 0) {
540                        for (MessageReference ref : unAckedMessages) {
541                            if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
542                                lastDeliveredRef = ref;
543                                markAsRedelivered = true;
544                                if (LOG.isDebugEnabled()) {
545                                    LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId());
546                                }
547                                break;
548                            }
549                        }
550                    }
551                    for (MessageReference ref : unAckedMessages) {
552                        QueueMessageReference qmr = (QueueMessageReference) ref;
553                        if (qmr.getLockOwner() == sub) {
554                            qmr.unlock();
555    
556                            // have no delivery information
557                            if (lastDeiveredSequenceId == 0) {
558                                qmr.incrementRedeliveryCounter();
559                            } else {
560                                if (markAsRedelivered) {
561                                    qmr.incrementRedeliveryCounter();
562                                }
563                                if (ref == lastDeliveredRef) {
564                                    // all that follow were not redelivered
565                                    markAsRedelivered = false;
566                                }
567                            }
568                        }
569                        redeliveredWaitingDispatch.addMessageLast(qmr);
570                    }
571                    if (!redeliveredWaitingDispatch.isEmpty()) {
572                        doDispatch(new OrderedPendingList());
573                    }
574                }finally {
575                    consumersLock.writeLock().unlock();
576                }
577                if (!(this.optimizedDispatch || isSlave())) {
578                    wakeup();
579                }
580            }finally {
581                pagedInPendingDispatchLock.writeLock().unlock();
582            }
583            if (this.optimizedDispatch || isSlave()) {
584                // Outside of dispatchLock() to maintain the lock hierarchy of
585                // iteratingMutex -> dispatchLock. - see
586                // https://issues.apache.org/activemq/browse/AMQ-1878
587                wakeup();
588            }
589        }
590    
591        public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
592            final ConnectionContext context = producerExchange.getConnectionContext();
593            // There is delay between the client sending it and it arriving at the
594            // destination.. it may have expired.
595            message.setRegionDestination(this);
596            ProducerState state = producerExchange.getProducerState();
597            if (state == null) {
598                LOG.warn("Send failed for: " + message + ",  missing producer state for: " + producerExchange);
599                throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
600            }
601            final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
602            final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
603                    && !context.isInRecoveryMode();
604            if (message.isExpired()) {
605                // message not stored - or added to stats yet - so chuck here
606                broker.getRoot().messageExpired(context, message, null);
607                if (sendProducerAck) {
608                    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
609                    context.getConnection().dispatchAsync(ack);
610                }
611                return;
612            }
613            if (memoryUsage.isFull()) {
614                isFull(context, memoryUsage);
615                fastProducer(context, producerInfo);
616                if (isProducerFlowControl() && context.isProducerFlowControl()) {
617                    if (warnOnProducerFlowControl) {
618                        warnOnProducerFlowControl = false;
619                        LOG
620                                .info("Usage Manager Memory Limit ("
621                                        + memoryUsage.getLimit()
622                                        + ") reached on "
623                                        + getActiveMQDestination().getQualifiedName()
624                                        + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
625                                        + " See http://activemq.apache.org/producer-flow-control.html for more info");
626                    }
627    
628                    if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
629                        throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
630                                + message.getProducerId() + ") to prevent flooding "
631                                + getActiveMQDestination().getQualifiedName() + "."
632                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
633                    }
634    
635                    // We can avoid blocking due to low usage if the producer is
636                    // sending
637                    // a sync message or if it is using a producer window
638                    if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
639                        // copy the exchange state since the context will be
640                        // modified while we are waiting
641                        // for space.
642                        final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
643                        synchronized (messagesWaitingForSpace) {
644                         // Start flow control timeout task
645                            // Prevent trying to start it multiple times
646                            if (!flowControlTimeoutTask.isAlive()) {
647                                flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task");
648                                flowControlTimeoutTask.start();
649                            }
650                            messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
651                                public void run() {
652    
653                                    try {
654                                        // While waiting for space to free up... the
655                                        // message may have expired.
656                                        if (message.isExpired()) {
657                                            LOG.error("expired waiting for space..");
658                                            broker.messageExpired(context, message, null);
659                                            destinationStatistics.getExpired().increment();
660                                        } else {
661                                            doMessageSend(producerExchangeCopy, message);
662                                        }
663    
664                                        if (sendProducerAck) {
665                                            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
666                                                    .getSize());
667                                            context.getConnection().dispatchAsync(ack);
668                                        } else {
669                                            Response response = new Response();
670                                            response.setCorrelationId(message.getCommandId());
671                                            context.getConnection().dispatchAsync(response);
672                                        }
673    
674                                    } catch (Exception e) {
675                                        if (!sendProducerAck && !context.isInRecoveryMode()) {
676                                            ExceptionResponse response = new ExceptionResponse(e);
677                                            response.setCorrelationId(message.getCommandId());
678                                            context.getConnection().dispatchAsync(response);
679                                        } else {
680                                            LOG.debug("unexpected exception on deferred send of :" + message, e);
681                                        }
682                                    }
683                                }
684                            });
685    
686                            if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
687                                flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
688                                        .getSendFailIfNoSpaceAfterTimeout()));
689                            }
690    
691                            registerCallbackForNotFullNotification();
692                            context.setDontSendReponse(true);
693                            return;
694                        }
695    
696                    } else {
697    
698                        if (memoryUsage.isFull()) {
699                            waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
700                                    + message.getProducerId() + ") stopped to prevent flooding "
701                                    + getActiveMQDestination().getQualifiedName() + "."
702                                    + " See http://activemq.apache.org/producer-flow-control.html for more info");
703                        }
704    
705                        // The usage manager could have delayed us by the time
706                        // we unblock the message could have expired..
707                        if (message.isExpired()) {
708                            if (LOG.isDebugEnabled()) {
709                                LOG.debug("Expired message: " + message);
710                            }
711                            broker.getRoot().messageExpired(context, message, null);
712                            return;
713                        }
714                    }
715                }
716            }
717            doMessageSend(producerExchange, message);
718            if (sendProducerAck) {
719                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
720                context.getConnection().dispatchAsync(ack);
721            }
722        }
723    
724        private void registerCallbackForNotFullNotification() {
725            // If the usage manager is not full, then the task will not
726            // get called..
727            if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
728                // so call it directly here.
729                sendMessagesWaitingForSpaceTask.run();
730            }
731        }
732    
733        void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
734                Exception {
735            final ConnectionContext context = producerExchange.getConnectionContext();
736            Future<Object> result = null;
737    
738            checkUsage(context, message);
739            sendLock.lockInterruptibly();
740            try {
741                if (store != null && message.isPersistent()) {
742                    message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
743                    if (messages.isCacheEnabled()) {
744                        result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
745                    } else {
746                        store.addMessage(context, message);
747                    }
748                    if (isReduceMemoryFootprint()) {
749                        message.clearMarshalledState();
750                    }
751                }
752                if (context.isInTransaction()) {
753                    // If this is a transacted message.. increase the usage now so that
754                    // a big TX does not blow up
755                    // our memory. This increment is decremented once the tx finishes..
756                    message.incrementReferenceCount();
757    
758                    context.getTransaction().addSynchronization(new Synchronization() {
759                        @Override
760                        public void afterCommit() throws Exception {
761                            sendLock.lockInterruptibly();
762                            try {
763                                // It could take while before we receive the commit
764                                // op, by that time the message could have expired..
765                                if (broker.isExpired(message)) {
766                                    broker.messageExpired(context, message, null);
767                                    destinationStatistics.getExpired().increment();
768                                    return;
769                                }
770                                sendMessage(message);
771                            } finally {
772                                sendLock.unlock();
773                                message.decrementReferenceCount();
774                            }
775                            messageSent(context, message);
776                        }
777                        @Override
778                        public void afterRollback() throws Exception {
779                            message.decrementReferenceCount();
780                        }
781                    });
782                } else {
783                    // Add to the pending list, this takes care of incrementing the
784                    // usage manager.
785                    sendMessage(message);
786                }
787            } finally {
788                sendLock.unlock();
789            }
790            if (!context.isInTransaction()) {
791                messageSent(context, message);
792            }
793            if (result != null && !result.isCancelled()) {
794                try {
795                    result.get();
796                } catch (CancellationException e) {
797                    // ignore - the task has been cancelled if the message
798                    // has already been deleted
799                }
800            }
801        }
802    
803        private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
804            if (message.isPersistent()) {
805                if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
806                    final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
807                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
808                        + message.getProducerId() + ") to prevent flooding "
809                        + getActiveMQDestination().getQualifiedName() + "."
810                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
811    
812                    waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
813                }
814            } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
815                final String logMessage = "Temp Store is Full ("
816                        + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit()
817                        +"). Stopping producer (" + message.getProducerId()
818                    + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
819                    + " See http://activemq.apache.org/producer-flow-control.html for more info";
820    
821                waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
822            }
823        }
824    
825        private void expireMessages() {
826            if (LOG.isDebugEnabled()) {
827                LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages ..");
828            }
829    
830            // just track the insertion count
831            List<Message> browsedMessages = new InsertionCountList<Message>();
832            doBrowse(browsedMessages, this.getMaxExpirePageSize());
833            asyncWakeup();
834            if (LOG.isDebugEnabled()) {
835                LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages done.");
836            }
837        }
838    
839        public void gc() {
840        }
841    
842        public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
843                throws IOException {
844            messageConsumed(context, node);
845            if (store != null && node.isPersistent()) {
846                store.removeAsyncMessage(context, convertToNonRangedAck(ack, node));
847            }
848        }
849    
850        Message loadMessage(MessageId messageId) throws IOException {
851            Message msg = null;
852            if (store != null) { // can be null for a temp q
853                msg = store.getMessage(messageId);
854                if (msg != null) {
855                    msg.setRegionDestination(this);
856                }
857            }
858            return msg;
859        }
860    
861        @Override
862        public String toString() {
863            int size = 0;
864            messagesLock.readLock().lock();
865            try{
866                size = messages.size();
867            }finally {
868                messagesLock.readLock().unlock();
869            }
870            return destination.getQualifiedName() + ", subscriptions=" + consumers.size()
871                    + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
872                    + messageGroupOwners;
873        }
874    
875        public void start() throws Exception {
876            if (memoryUsage != null) {
877                memoryUsage.start();
878            }
879            if (systemUsage.getStoreUsage() != null) {
880                systemUsage.getStoreUsage().start();
881            }
882            systemUsage.getMemoryUsage().addUsageListener(this);
883            messages.start();
884            if (getExpireMessagesPeriod() > 0) {
885                scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
886            }
887            doPageIn(false);
888        }
889    
890        public void stop() throws Exception {
891            if (taskRunner != null) {
892                taskRunner.shutdown();
893            }
894            if (this.executor != null) {
895                ThreadPoolUtils.shutdownNow(executor);
896                executor = null;
897            }
898    
899            scheduler.cancel(expireMessagesTask);
900    
901            if (flowControlTimeoutTask.isAlive()) {
902                flowControlTimeoutTask.interrupt();
903            }
904    
905            if (messages != null) {
906                messages.stop();
907            }
908    
909            systemUsage.getMemoryUsage().removeUsageListener(this);
910            if (memoryUsage != null) {
911                memoryUsage.stop();
912            }
913            if (store != null) {
914                store.stop();
915            }
916        }
917    
918        // Properties
919        // -------------------------------------------------------------------------
920        @Override
921        public ActiveMQDestination getActiveMQDestination() {
922            return destination;
923        }
924    
925        public MessageGroupMap getMessageGroupOwners() {
926            if (messageGroupOwners == null) {
927                messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
928            }
929            return messageGroupOwners;
930        }
931    
932        public DispatchPolicy getDispatchPolicy() {
933            return dispatchPolicy;
934        }
935    
936        public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
937            this.dispatchPolicy = dispatchPolicy;
938        }
939    
940        public MessageGroupMapFactory getMessageGroupMapFactory() {
941            return messageGroupMapFactory;
942        }
943    
944        public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
945            this.messageGroupMapFactory = messageGroupMapFactory;
946        }
947    
948        public PendingMessageCursor getMessages() {
949            return this.messages;
950        }
951    
952        public void setMessages(PendingMessageCursor messages) {
953            this.messages = messages;
954        }
955    
956        public boolean isUseConsumerPriority() {
957            return useConsumerPriority;
958        }
959    
960        public void setUseConsumerPriority(boolean useConsumerPriority) {
961            this.useConsumerPriority = useConsumerPriority;
962        }
963    
964        public boolean isStrictOrderDispatch() {
965            return strictOrderDispatch;
966        }
967    
968        public void setStrictOrderDispatch(boolean strictOrderDispatch) {
969            this.strictOrderDispatch = strictOrderDispatch;
970        }
971    
972        public boolean isOptimizedDispatch() {
973            return optimizedDispatch;
974        }
975    
976        public void setOptimizedDispatch(boolean optimizedDispatch) {
977            this.optimizedDispatch = optimizedDispatch;
978        }
979    
980        public int getTimeBeforeDispatchStarts() {
981            return timeBeforeDispatchStarts;
982        }
983    
984        public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
985            this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
986        }
987    
988        public int getConsumersBeforeDispatchStarts() {
989            return consumersBeforeDispatchStarts;
990        }
991    
992        public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
993            this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
994        }
995    
996        public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
997            this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
998        }
999    
1000        public boolean isAllConsumersExclusiveByDefault() {
1001            return allConsumersExclusiveByDefault;
1002        }
1003    
1004    
1005        // Implementation methods
1006        // -------------------------------------------------------------------------
1007        private QueueMessageReference createMessageReference(Message message) {
1008            QueueMessageReference result = new IndirectMessageReference(message);
1009            return result;
1010        }
1011    
1012        public Message[] browse() {
1013            List<Message> browseList = new ArrayList<Message>();
1014            doBrowse(browseList, getMaxBrowsePageSize());
1015            return browseList.toArray(new Message[browseList.size()]);
1016        }
1017    
1018        public void doBrowse(List<Message> browseList, int max) {
1019            final ConnectionContext connectionContext = createConnectionContext();
1020            try {
1021                pageInMessages(!isUseCache());
1022                List<MessageReference> toExpire = new ArrayList<MessageReference>();
1023    
1024                pagedInPendingDispatchLock.writeLock().lock();
1025                try {
1026                    addAll(pagedInPendingDispatch.values(), browseList, max, toExpire);
1027                    for (MessageReference ref : toExpire) {
1028                        pagedInPendingDispatch.remove(ref);
1029                        if (broker.isExpired(ref)) {
1030                            if (LOG.isDebugEnabled()) {
1031                                LOG.debug("expiring from pagedInPending: " + ref);
1032                            }
1033                            messageExpired(connectionContext, ref);
1034                        }
1035                    }
1036                } finally {
1037                    pagedInPendingDispatchLock.writeLock().unlock();
1038                }
1039                toExpire.clear();
1040                pagedInMessagesLock.readLock().lock();
1041                try {
1042                    addAll(pagedInMessages.values(), browseList, max, toExpire);
1043                } finally {
1044                    pagedInMessagesLock.readLock().unlock();
1045                }
1046                for (MessageReference ref : toExpire) {
1047                    if (broker.isExpired(ref)) {
1048                        if (LOG.isDebugEnabled()) {
1049                            LOG.debug("expiring from pagedInMessages: " + ref);
1050                        }
1051                        messageExpired(connectionContext, ref);
1052                    } else {
1053                        pagedInMessagesLock.writeLock().lock();
1054                        try {
1055                            pagedInMessages.remove(ref.getMessageId());
1056                        } finally {
1057                            pagedInMessagesLock.writeLock().unlock();
1058                        }
1059                    }
1060                }
1061    
1062                if (browseList.size() < getMaxBrowsePageSize()) {
1063                    messagesLock.writeLock().lock();
1064                    try {
1065                        try {
1066                            messages.reset();
1067                            while (messages.hasNext() && browseList.size() < max) {
1068                                MessageReference node = messages.next();
1069                                if (node.isExpired()) {
1070                                    if (broker.isExpired(node)) {
1071                                        if (LOG.isDebugEnabled()) {
1072                                            LOG.debug("expiring from messages: " + node);
1073                                        }
1074                                        messageExpired(connectionContext, createMessageReference(node.getMessage()));
1075                                    }
1076                                    messages.remove();
1077                                } else {
1078                                    messages.rollback(node.getMessageId());
1079                                    if (browseList.contains(node.getMessage()) == false) {
1080                                        browseList.add(node.getMessage());
1081                                    }
1082                                }
1083                                node.decrementReferenceCount();
1084                            }
1085                        } finally {
1086                            messages.release();
1087                        }
1088                    } finally {
1089                        messagesLock.writeLock().unlock();
1090                    }
1091                }
1092    
1093            } catch (Exception e) {
1094                LOG.error("Problem retrieving message for browse", e);
1095            }
1096        }
1097    
1098        private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int maxBrowsePageSize,
1099                List<MessageReference> toExpire) throws Exception {
1100            for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
1101                QueueMessageReference ref = (QueueMessageReference) i.next();
1102                if (ref.isExpired()) {
1103                    toExpire.add(ref);
1104                } else if (l.contains(ref.getMessage()) == false) {
1105                    l.add(ref.getMessage());
1106                }
1107            }
1108        }
1109    
1110        public QueueMessageReference getMessage(String id) {
1111            MessageId msgId = new MessageId(id);
1112            pagedInMessagesLock.readLock().lock();
1113            try{
1114                QueueMessageReference ref = this.pagedInMessages.get(msgId);
1115                if (ref != null) {
1116                    return ref;
1117                }
1118            }finally {
1119                pagedInMessagesLock.readLock().unlock();
1120            }
1121            messagesLock.readLock().lock();
1122            try{
1123                try {
1124                    messages.reset();
1125                    while (messages.hasNext()) {
1126                        MessageReference mr = messages.next();
1127                        QueueMessageReference qmr = createMessageReference(mr.getMessage());
1128                        qmr.decrementReferenceCount();
1129                        messages.rollback(qmr.getMessageId());
1130                        if (msgId.equals(qmr.getMessageId())) {
1131                            return qmr;
1132                        }
1133                    }
1134                } finally {
1135                    messages.release();
1136                }
1137            }finally {
1138                messagesLock.readLock().unlock();
1139            }
1140            return null;
1141        }
1142    
1143        public void purge() throws Exception {
1144            ConnectionContext c = createConnectionContext();
1145            List<MessageReference> list = null;
1146            do {
1147                doPageIn(true);
1148                pagedInMessagesLock.readLock().lock();
1149                try {
1150                    list = new ArrayList<MessageReference>(pagedInMessages.values());
1151                }finally {
1152                    pagedInMessagesLock.readLock().unlock();
1153                }
1154    
1155                for (MessageReference ref : list) {
1156                    try {
1157                        QueueMessageReference r = (QueueMessageReference) ref;
1158                        removeMessage(c, r);
1159                    } catch (IOException e) {
1160                    }
1161                }
1162                // don't spin/hang if stats are out and there is nothing left in the
1163                // store
1164            } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
1165            if (this.destinationStatistics.getMessages().getCount() > 0) {
1166                LOG.warn(getActiveMQDestination().getQualifiedName()
1167                        + " after purge complete, message count stats report: "
1168                        + this.destinationStatistics.getMessages().getCount());
1169            }
1170            gc();
1171            this.destinationStatistics.getMessages().setCount(0);
1172            getMessages().clear();
1173        }
1174    
1175        public void clearPendingMessages() {
1176            messagesLock.writeLock().lock();
1177            try {
1178                if (store != null) {
1179                    store.resetBatching();
1180                }
1181                messages.gc();
1182                messages.reset();
1183                asyncWakeup();
1184            } finally {
1185                messagesLock.writeLock().unlock();
1186            }
1187        }
1188    
1189        /**
1190         * Removes the message matching the given messageId
1191         */
1192        public boolean removeMessage(String messageId) throws Exception {
1193            return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
1194        }
1195    
1196        /**
1197         * Removes the messages matching the given selector
1198         *
1199         * @return the number of messages removed
1200         */
1201        public int removeMatchingMessages(String selector) throws Exception {
1202            return removeMatchingMessages(selector, -1);
1203        }
1204    
1205        /**
1206         * Removes the messages matching the given selector up to the maximum number
1207         * of matched messages
1208         *
1209         * @return the number of messages removed
1210         */
1211        public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
1212            return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
1213        }
1214    
1215        /**
1216         * Removes the messages matching the given filter up to the maximum number
1217         * of matched messages
1218         *
1219         * @return the number of messages removed
1220         */
1221        public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
1222            int movedCounter = 0;
1223            Set<MessageReference> set = new LinkedHashSet<MessageReference>();
1224            ConnectionContext context = createConnectionContext();
1225            do {
1226                doPageIn(true);
1227                pagedInMessagesLock.readLock().lock();
1228                try{
1229                    set.addAll(pagedInMessages.values());
1230                }finally {
1231                    pagedInMessagesLock.readLock().unlock();
1232                }
1233                List<MessageReference> list = new ArrayList<MessageReference>(set);
1234                for (MessageReference ref : list) {
1235                    IndirectMessageReference r = (IndirectMessageReference) ref;
1236                    if (filter.evaluate(context, r)) {
1237    
1238                        removeMessage(context, r);
1239                        set.remove(r);
1240                        if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1241                            return movedCounter;
1242                        }
1243                    }
1244                }
1245            } while (set.size() < this.destinationStatistics.getMessages().getCount());
1246            return movedCounter;
1247        }
1248    
1249        /**
1250         * Copies the message matching the given messageId
1251         */
1252        public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1253                throws Exception {
1254            return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
1255        }
1256    
1257        /**
1258         * Copies the messages matching the given selector
1259         *
1260         * @return the number of messages copied
1261         */
1262        public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1263                throws Exception {
1264            return copyMatchingMessagesTo(context, selector, dest, -1);
1265        }
1266    
1267        /**
1268         * Copies the messages matching the given selector up to the maximum number
1269         * of matched messages
1270         *
1271         * @return the number of messages copied
1272         */
1273        public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1274                int maximumMessages) throws Exception {
1275            return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
1276        }
1277    
1278        /**
1279         * Copies the messages matching the given filter up to the maximum number of
1280         * matched messages
1281         *
1282         * @return the number of messages copied
1283         */
1284        public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
1285                int maximumMessages) throws Exception {
1286            int movedCounter = 0;
1287            int count = 0;
1288            Set<MessageReference> set = new LinkedHashSet<MessageReference>();
1289            do {
1290                int oldMaxSize = getMaxPageSize();
1291                setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
1292                doPageIn(true);
1293                setMaxPageSize(oldMaxSize);
1294                pagedInMessagesLock.readLock().lock();
1295                try {
1296                    set.addAll(pagedInMessages.values());
1297                }finally {
1298                    pagedInMessagesLock.readLock().unlock();
1299                }
1300                List<MessageReference> list = new ArrayList<MessageReference>(set);
1301                for (MessageReference ref : list) {
1302                    IndirectMessageReference r = (IndirectMessageReference) ref;
1303                    if (filter.evaluate(context, r)) {
1304    
1305                        r.incrementReferenceCount();
1306                        try {
1307                            Message m = r.getMessage();
1308                            BrokerSupport.resend(context, m, dest);
1309                            if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1310                                return movedCounter;
1311                            }
1312                        } finally {
1313                            r.decrementReferenceCount();
1314                        }
1315                    }
1316                    count++;
1317                }
1318            } while (count < this.destinationStatistics.getMessages().getCount());
1319            return movedCounter;
1320        }
1321    
1322        /**
1323         * Move a message
1324         *
1325         * @param context
1326         *            connection context
1327         * @param m
1328         *            QueueMessageReference
1329         * @param dest
1330         *            ActiveMQDestination
1331         * @throws Exception
1332         */
1333        public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
1334            BrokerSupport.resend(context, m.getMessage(), dest);
1335            removeMessage(context, m);
1336            messagesLock.writeLock().lock();
1337            try{
1338                messages.rollback(m.getMessageId());
1339            }finally {
1340                messagesLock.writeLock().unlock();
1341            }
1342            return true;
1343        }
1344    
1345        /**
1346         * Moves the message matching the given messageId
1347         */
1348        public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1349                throws Exception {
1350            return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
1351        }
1352    
1353        /**
1354         * Moves the messages matching the given selector
1355         *
1356         * @return the number of messages removed
1357         */
1358        public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1359                throws Exception {
1360            return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
1361        }
1362    
1363        /**
1364         * Moves the messages matching the given selector up to the maximum number
1365         * of matched messages
1366         */
1367        public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1368                int maximumMessages) throws Exception {
1369            return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
1370        }
1371    
1372        /**
1373         * Moves the messages matching the given filter up to the maximum number of
1374         * matched messages
1375         */
1376        public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
1377                ActiveMQDestination dest, int maximumMessages) throws Exception {
1378            int movedCounter = 0;
1379            Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
1380            do {
1381                doPageIn(true);
1382                pagedInMessagesLock.readLock().lock();
1383                try{
1384                    set.addAll(pagedInMessages.values());
1385                }finally {
1386                    pagedInMessagesLock.readLock().unlock();
1387                }
1388                List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
1389                for (QueueMessageReference ref : list) {
1390                    if (filter.evaluate(context, ref)) {
1391                        // We should only move messages that can be locked.
1392                        moveMessageTo(context, ref, dest);
1393                        set.remove(ref);
1394                        if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1395                            return movedCounter;
1396                        }
1397                    }
1398                }
1399            } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
1400            return movedCounter;
1401        }
1402    
1403        /**
1404         * @return true if we would like to iterate again
1405         * @see org.apache.activemq.thread.Task#iterate()
1406         */
1407        public boolean iterate() {
1408            MDC.put("activemq.destination", getName());
1409            boolean pageInMoreMessages = false;
1410            synchronized (iteratingMutex) {
1411    
1412                // If optimize dispatch is on or this is a slave this method could be called recursively
1413                // we set this state value to short-circuit wakeup in those cases to avoid that as it
1414                // could lead to errors.
1415                iterationRunning = true;
1416    
1417                // do early to allow dispatch of these waiting messages
1418                synchronized (messagesWaitingForSpace) {
1419                    Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
1420                    while (it.hasNext()) {
1421                        if (!memoryUsage.isFull()) {
1422                            Runnable op = it.next();
1423                            it.remove();
1424                            op.run();
1425                        } else {
1426                            registerCallbackForNotFullNotification();
1427                            break;
1428                        }
1429                    }
1430                }
1431    
1432                if (firstConsumer) {
1433                    firstConsumer = false;
1434                    try {
1435                        if (consumersBeforeDispatchStarts > 0) {
1436                            int timeout = 1000; // wait one second by default if
1437                                                // consumer count isn't reached
1438                            if (timeBeforeDispatchStarts > 0) {
1439                                timeout = timeBeforeDispatchStarts;
1440                            }
1441                            if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
1442                                if (LOG.isDebugEnabled()) {
1443                                    LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
1444                                }
1445                            } else {
1446                                if (LOG.isDebugEnabled()) {
1447                                    LOG.debug(timeout + " ms elapsed and " + consumers.size()
1448                                            + " consumers subscribed. Starting dispatch.");
1449                                }
1450                            }
1451                        }
1452                        if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
1453                            iteratingMutex.wait(timeBeforeDispatchStarts);
1454                            if (LOG.isDebugEnabled()) {
1455                                LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
1456                            }
1457                        }
1458                    } catch (Exception e) {
1459                        LOG.error(e.toString());
1460                    }
1461                }
1462    
1463                BrowserDispatch pendingBrowserDispatch = browserDispatches.poll();
1464    
1465                messagesLock.readLock().lock();
1466                try{
1467                    pageInMoreMessages |= !messages.isEmpty();
1468                } finally {
1469                    messagesLock.readLock().unlock();
1470                }
1471    
1472                pagedInPendingDispatchLock.readLock().lock();
1473                try {
1474                    pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
1475                } finally {
1476                    pagedInPendingDispatchLock.readLock().unlock();
1477                }
1478    
1479                // Perhaps we should page always into the pagedInPendingDispatch
1480                // list if
1481                // !messages.isEmpty(), and then if
1482                // !pagedInPendingDispatch.isEmpty()
1483                // then we do a dispatch.
1484                if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
1485                    try {
1486                        pageInMessages(pendingBrowserDispatch != null);
1487    
1488                    } catch (Throwable e) {
1489                        LOG.error("Failed to page in more queue messages ", e);
1490                    }
1491                }
1492    
1493                if (pendingBrowserDispatch != null) {
1494                    ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
1495                    pagedInMessagesLock.readLock().lock();
1496                    try{
1497                        alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
1498                    }finally {
1499                        pagedInMessagesLock.readLock().unlock();
1500                    }
1501                    if (LOG.isDebugEnabled()) {
1502                        LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
1503                                + ", already dispatched/paged count: " + alreadyDispatchedMessages.size());
1504                    }
1505                    do {
1506                        try {
1507                            MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
1508                            msgContext.setDestination(destination);
1509    
1510                            QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
1511                            for (QueueMessageReference node : alreadyDispatchedMessages) {
1512                                if (!node.isAcked()) {
1513                                    msgContext.setMessageReference(node);
1514                                    if (browser.matches(node, msgContext)) {
1515                                        browser.add(node);
1516                                    }
1517                                }
1518                            }
1519                            pendingBrowserDispatch.done();
1520                        } catch (Exception e) {
1521                            LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
1522                        }
1523    
1524                    } while ((pendingBrowserDispatch = browserDispatches.poll()) != null);
1525                }
1526    
1527                if (pendingWakeups.get() > 0) {
1528                    pendingWakeups.decrementAndGet();
1529                }
1530                MDC.remove("activemq.destination");
1531                iterationRunning = false;
1532    
1533                return pendingWakeups.get() > 0;
1534            }
1535        }
1536    
1537        protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
1538            return new MessageReferenceFilter() {
1539                public boolean evaluate(ConnectionContext context, MessageReference r) {
1540                    return messageId.equals(r.getMessageId().toString());
1541                }
1542    
1543                @Override
1544                public String toString() {
1545                    return "MessageIdFilter: " + messageId;
1546                }
1547            };
1548        }
1549    
1550        protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException {
1551    
1552            if (selector == null || selector.isEmpty()) {
1553                return new MessageReferenceFilter() {
1554    
1555                    @Override
1556                    public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException {
1557                        return true;
1558                    }
1559                };
1560            }
1561    
1562            final BooleanExpression selectorExpression = SelectorParser.parse(selector);
1563    
1564            return new MessageReferenceFilter() {
1565                public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException {
1566                    MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
1567    
1568                    messageEvaluationContext.setMessageReference(r);
1569                    if (messageEvaluationContext.getDestination() == null) {
1570                        messageEvaluationContext.setDestination(getActiveMQDestination());
1571                    }
1572    
1573                    return selectorExpression.matches(messageEvaluationContext);
1574                }
1575            };
1576        }
1577    
1578        protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
1579            removeMessage(c, null, r);
1580            pagedInPendingDispatchLock.writeLock().lock();
1581            try {
1582                pagedInPendingDispatch.remove(r);
1583            } finally {
1584                pagedInPendingDispatchLock.writeLock().unlock();
1585            }
1586    
1587        }
1588    
1589        protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
1590            MessageAck ack = new MessageAck();
1591            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
1592            ack.setDestination(destination);
1593            ack.setMessageID(r.getMessageId());
1594            removeMessage(c, subs, r, ack);
1595        }
1596    
1597        protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
1598                MessageAck ack) throws IOException {
1599            reference.setAcked(true);
1600            // This sends the ack the the journal..
1601            if (!ack.isInTransaction()) {
1602                acknowledge(context, sub, ack, reference);
1603                getDestinationStatistics().getDequeues().increment();
1604                dropMessage(reference);
1605            } else {
1606                try {
1607                    acknowledge(context, sub, ack, reference);
1608                } finally {
1609                    context.getTransaction().addSynchronization(new Synchronization() {
1610    
1611                        @Override
1612                        public void afterCommit() throws Exception {
1613                            getDestinationStatistics().getDequeues().increment();
1614                            dropMessage(reference);
1615                            wakeup();
1616                        }
1617    
1618                        @Override
1619                        public void afterRollback() throws Exception {
1620                            reference.setAcked(false);
1621                            wakeup();
1622                        }
1623                    });
1624                }
1625            }
1626            if (ack.isPoisonAck()) {
1627                // message gone to DLQ, is ok to allow redelivery
1628                messagesLock.writeLock().lock();
1629                try{
1630                    messages.rollback(reference.getMessageId());
1631                }finally {
1632                    messagesLock.writeLock().unlock();
1633                }
1634            }
1635    
1636        }
1637    
1638        private void dropMessage(QueueMessageReference reference) {
1639            reference.drop();
1640            destinationStatistics.getMessages().decrement();
1641            pagedInMessagesLock.writeLock().lock();
1642            try{
1643                pagedInMessages.remove(reference.getMessageId());
1644            }finally {
1645                pagedInMessagesLock.writeLock().unlock();
1646            }
1647        }
1648    
1649        public void messageExpired(ConnectionContext context, MessageReference reference) {
1650            messageExpired(context, null, reference);
1651        }
1652    
1653        public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
1654            if (LOG.isDebugEnabled()) {
1655                LOG.debug("message expired: " + reference);
1656            }
1657            broker.messageExpired(context, reference, subs);
1658            destinationStatistics.getExpired().increment();
1659            try {
1660                removeMessage(context, subs, (QueueMessageReference) reference);
1661            } catch (IOException e) {
1662                LOG.error("Failed to remove expired Message from the store ", e);
1663            }
1664        }
1665    
1666        final void sendMessage(final Message msg) throws Exception {
1667            messagesLock.writeLock().lock();
1668            try{
1669                messages.addMessageLast(msg);
1670            }finally {
1671                messagesLock.writeLock().unlock();
1672            }
1673        }
1674    
1675        final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
1676            destinationStatistics.getEnqueues().increment();
1677            destinationStatistics.getMessages().increment();
1678            messageDelivered(context, msg);
1679            consumersLock.readLock().lock();
1680            try {
1681                if (consumers.isEmpty()) {
1682                    onMessageWithNoConsumers(context, msg);
1683                }
1684            }finally {
1685                consumersLock.readLock().unlock();
1686            }
1687            if (LOG.isDebugEnabled()) {
1688                LOG.debug(broker.getBrokerName() + " Message " + msg.getMessageId() + " sent to " + this.destination);
1689            }
1690            wakeup();
1691        }
1692    
1693        public void wakeup() {
1694            if ((optimizedDispatch || isSlave()) && !iterationRunning) {
1695                iterate();
1696                pendingWakeups.incrementAndGet();
1697            } else {
1698                asyncWakeup();
1699            }
1700        }
1701    
1702        private void asyncWakeup() {
1703            try {
1704                pendingWakeups.incrementAndGet();
1705                this.taskRunner.wakeup();
1706            } catch (InterruptedException e) {
1707                LOG.warn("Async task runner failed to wakeup ", e);
1708            }
1709        }
1710    
1711        private boolean isSlave() {
1712            return broker.getBrokerService().isSlave();
1713        }
1714    
1715        private void doPageIn(boolean force) throws Exception {
1716            PendingList newlyPaged = doPageInForDispatch(force);
1717            pagedInPendingDispatchLock.writeLock().lock();
1718            try {
1719                if (pagedInPendingDispatch.isEmpty()) {
1720                    pagedInPendingDispatch.addAll(newlyPaged);
1721    
1722                } else {
1723                    for (MessageReference qmr : newlyPaged) {
1724                        if (!pagedInPendingDispatch.contains(qmr)) {
1725                            pagedInPendingDispatch.addMessageLast(qmr);
1726                        }
1727                    }
1728                }
1729            } finally {
1730                pagedInPendingDispatchLock.writeLock().unlock();
1731            }
1732        }
1733    
1734        private PendingList doPageInForDispatch(boolean force) throws Exception {
1735            List<QueueMessageReference> result = null;
1736            PendingList resultList = null;
1737    
1738            int toPageIn = Math.min(getMaxPageSize(), messages.size());
1739            if (LOG.isDebugEnabled()) {
1740                LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
1741                        + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
1742                        + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount()
1743                        + ", dequeueCount: " + destinationStatistics.getDequeues().getCount());
1744            }
1745    
1746            if (isLazyDispatch() && !force) {
1747                // Only page in the minimum number of messages which can be
1748                // dispatched immediately.
1749                toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
1750            }
1751            int pagedInPendingSize = 0;
1752            pagedInPendingDispatchLock.readLock().lock();
1753            try {
1754                pagedInPendingSize = pagedInPendingDispatch.size();
1755            } finally {
1756                pagedInPendingDispatchLock.readLock().unlock();
1757            }
1758            if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
1759                int count = 0;
1760                result = new ArrayList<QueueMessageReference>(toPageIn);
1761                messagesLock.writeLock().lock();
1762                try {
1763                    try {
1764                        messages.setMaxBatchSize(toPageIn);
1765                        messages.reset();
1766                        while (messages.hasNext() && count < toPageIn) {
1767                            MessageReference node = messages.next();
1768                            messages.remove();
1769    
1770                            QueueMessageReference ref = createMessageReference(node.getMessage());
1771                            if (ref.isExpired()) {
1772                                if (broker.isExpired(ref)) {
1773                                    messageExpired(createConnectionContext(), ref);
1774                                } else {
1775                                    ref.decrementReferenceCount();
1776                                }
1777                            } else {
1778                                result.add(ref);
1779                                count++;
1780                            }
1781                        }
1782                    } finally {
1783                        messages.release();
1784                    }
1785                } finally {
1786                    messagesLock.writeLock().unlock();
1787                }
1788                // Only add new messages, not already pagedIn to avoid multiple
1789                // dispatch attempts
1790                pagedInMessagesLock.writeLock().lock();
1791                try {
1792                    if(isPrioritizedMessages()) {
1793                        resultList = new PrioritizedPendingList();
1794                    } else {
1795                        resultList = new OrderedPendingList();
1796                    }
1797                    for (QueueMessageReference ref : result) {
1798                        if (!pagedInMessages.containsKey(ref.getMessageId())) {
1799                            pagedInMessages.put(ref.getMessageId(), ref);
1800                            resultList.addMessageLast(ref);
1801                        } else {
1802                            ref.decrementReferenceCount();
1803                        }
1804                    }
1805                } finally {
1806                    pagedInMessagesLock.writeLock().unlock();
1807                }
1808            } else {
1809                // Avoid return null list, if condition is not validated
1810                resultList = new OrderedPendingList();
1811            }
1812    
1813            return resultList;
1814        }
1815    
1816        private void doDispatch(PendingList list) throws Exception {
1817            boolean doWakeUp = false;
1818    
1819            pagedInPendingDispatchLock.writeLock().lock();
1820            try {
1821                if (!redeliveredWaitingDispatch.isEmpty()) {
1822                    // Try first to dispatch redelivered messages to keep an
1823                    // proper order
1824                    redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
1825                }
1826                if (!pagedInPendingDispatch.isEmpty()) {
1827                    // Next dispatch anything that had not been
1828                    // dispatched before.
1829                    pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
1830                }
1831                // and now see if we can dispatch the new stuff.. and append to
1832                // the pending
1833                // list anything that does not actually get dispatched.
1834                if (list != null && !list.isEmpty()) {
1835                    if (pagedInPendingDispatch.isEmpty()) {
1836                        pagedInPendingDispatch.addAll(doActualDispatch(list));
1837                    } else {
1838                        for (MessageReference qmr : list) {
1839                            if (!pagedInPendingDispatch.contains(qmr)) {
1840                                pagedInPendingDispatch.addMessageLast(qmr);
1841                            }
1842                        }
1843                        doWakeUp = true;
1844                    }
1845                }
1846            } finally {
1847                pagedInPendingDispatchLock.writeLock().unlock();
1848            }
1849    
1850            if (doWakeUp) {
1851                // avoid lock order contention
1852                asyncWakeup();
1853            }
1854        }
1855    
1856        /**
1857         * @return list of messages that could get dispatched to consumers if they
1858         *         were not full.
1859         */
1860        private PendingList doActualDispatch(PendingList list) throws Exception {
1861            List<Subscription> consumers;
1862            consumersLock.writeLock().lock();
1863    
1864            try {
1865                if (this.consumers.isEmpty() || isSlave()) {
1866                    // slave dispatch happens in processDispatchNotification
1867                    return list;
1868                }
1869                consumers = new ArrayList<Subscription>(this.consumers);
1870            }finally {
1871                consumersLock.writeLock().unlock();
1872            }
1873    
1874            PendingList rc;
1875            if(isPrioritizedMessages()) {
1876                rc = new PrioritizedPendingList();
1877            } else {
1878                rc = new OrderedPendingList();
1879            }
1880    
1881            Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
1882    
1883            for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
1884    
1885                MessageReference node = (MessageReference) iterator.next();
1886                Subscription target = null;
1887                int interestCount = 0;
1888                for (Subscription s : consumers) {
1889                    if (s instanceof QueueBrowserSubscription) {
1890                        interestCount++;
1891                        continue;
1892                    }
1893                    if (!fullConsumers.contains(s)) {
1894                        if (!s.isFull()) {
1895                            if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
1896                                // Dispatch it.
1897                                s.add(node);
1898                                target = s;
1899                                break;
1900                            }
1901                        } else {
1902                            // no further dispatch of list to a full consumer to
1903                            // avoid out of order message receipt
1904                            fullConsumers.add(s);
1905                            LOG.trace("Subscription full {}", s);
1906                        }
1907                    }
1908                    // make sure it gets dispatched again
1909                    if (!node.isDropped()) {
1910                        interestCount++;
1911                    }
1912                }
1913    
1914                if ((target == null && interestCount > 0) || consumers.size() == 0) {
1915                    // This means all subs were full or that there are no
1916                    // consumers...
1917                    rc.addMessageLast((QueueMessageReference) node);
1918                }
1919    
1920                // If it got dispatched, rotate the consumer list to get round robin
1921                // distribution.
1922                if (target != null && !strictOrderDispatch && consumers.size() > 1
1923                        && !dispatchSelector.isExclusiveConsumer(target)) {
1924                    consumersLock.writeLock().lock();
1925                    try {
1926                        if (removeFromConsumerList(target)) {
1927                            addToConsumerList(target);
1928                            consumers = new ArrayList<Subscription>(this.consumers);
1929                        }
1930                    }finally {
1931                        consumersLock.writeLock().unlock();
1932                    }
1933                }
1934            }
1935    
1936            return rc;
1937        }
1938    
1939        protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
1940            boolean result = true;
1941            // Keep message groups together.
1942            String groupId = node.getGroupID();
1943            int sequence = node.getGroupSequence();
1944            if (groupId != null) {
1945    
1946                MessageGroupMap messageGroupOwners = getMessageGroupOwners();
1947                // If we can own the first, then no-one else should own the
1948                // rest.
1949                if (sequence == 1) {
1950                    assignGroup(subscription, messageGroupOwners, node, groupId);
1951                } else {
1952    
1953                    // Make sure that the previous owner is still valid, we may
1954                    // need to become the new owner.
1955                    ConsumerId groupOwner;
1956    
1957                    groupOwner = messageGroupOwners.get(groupId);
1958                    if (groupOwner == null) {
1959                        assignGroup(subscription, messageGroupOwners, node, groupId);
1960                    } else {
1961                        if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
1962                            // A group sequence < 1 is an end of group signal.
1963                            if (sequence < 0) {
1964                                messageGroupOwners.removeGroup(groupId);
1965                                subscription.getConsumerInfo().setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1);
1966                            }
1967                        } else {
1968                            result = false;
1969                        }
1970                    }
1971                }
1972            }
1973    
1974            return result;
1975    
1976        }
1977    
1978        protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
1979            messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
1980            Message message = n.getMessage();
1981            if (message instanceof ActiveMQMessage) {
1982                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
1983                try {
1984                    activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
1985                } catch (JMSException e) {
1986                    LOG.warn("Failed to set boolean header: " + e, e);
1987                }
1988            }
1989            subs.getConsumerInfo().setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1);
1990        }
1991    
1992        protected void pageInMessages(boolean force) throws Exception {
1993            doDispatch(doPageInForDispatch(force));
1994        }
1995    
1996        private void addToConsumerList(Subscription sub) {
1997            if (useConsumerPriority) {
1998                consumers.add(sub);
1999                Collections.sort(consumers, orderedCompare);
2000            } else {
2001                consumers.add(sub);
2002            }
2003        }
2004    
2005        private boolean removeFromConsumerList(Subscription sub) {
2006            return consumers.remove(sub);
2007        }
2008    
2009        private int getConsumerMessageCountBeforeFull() throws Exception {
2010            int total = 0;
2011            boolean zeroPrefetch = false;
2012            consumersLock.readLock().lock();
2013            try{
2014                for (Subscription s : consumers) {
2015                    zeroPrefetch |= s.getPrefetchSize() == 0;
2016                    int countBeforeFull = s.countBeforeFull();
2017                    total += countBeforeFull;
2018                }
2019            }finally {
2020                consumersLock.readLock().unlock();
2021            }
2022            if (total == 0 && zeroPrefetch) {
2023                total = 1;
2024            }
2025            return total;
2026        }
2027    
2028        /*
2029         * In slave mode, dispatch is ignored till we get this notification as the
2030         * dispatch process is non deterministic between master and slave. On a
2031         * notification, the actual dispatch to the subscription (as chosen by the
2032         * master) is completed. (non-Javadoc)
2033         * @see
2034         * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
2035         * (org.apache.activemq.command.MessageDispatchNotification)
2036         */
2037        @Override
2038        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
2039            // do dispatch
2040            Subscription sub = getMatchingSubscription(messageDispatchNotification);
2041            if (sub != null) {
2042                MessageReference message = getMatchingMessage(messageDispatchNotification);
2043                sub.add(message);
2044                sub.processMessageDispatchNotification(messageDispatchNotification);
2045            }
2046        }
2047    
2048        private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
2049                throws Exception {
2050            QueueMessageReference message = null;
2051            MessageId messageId = messageDispatchNotification.getMessageId();
2052    
2053            pagedInPendingDispatchLock.writeLock().lock();
2054            try {
2055                for (MessageReference ref : pagedInPendingDispatch) {
2056                    if (messageId.equals(ref.getMessageId())) {
2057                        message = (QueueMessageReference)ref;
2058                        pagedInPendingDispatch.remove(ref);
2059                        break;
2060                    }
2061                }
2062            } finally {
2063                pagedInPendingDispatchLock.writeLock().unlock();
2064            }
2065    
2066            if (message == null) {
2067                pagedInMessagesLock.readLock().lock();
2068                try {
2069                    message = pagedInMessages.get(messageId);
2070                } finally {
2071                    pagedInMessagesLock.readLock().unlock();
2072                }
2073            }
2074    
2075            if (message == null) {
2076                messagesLock.writeLock().lock();
2077                try {
2078                    try {
2079                        messages.setMaxBatchSize(getMaxPageSize());
2080                        messages.reset();
2081                        while (messages.hasNext()) {
2082                            MessageReference node = messages.next();
2083                            messages.remove();
2084                            if (messageId.equals(node.getMessageId())) {
2085                                message = this.createMessageReference(node.getMessage());
2086                                break;
2087                            }
2088                        }
2089                    } finally {
2090                        messages.release();
2091                    }
2092                } finally {
2093                    messagesLock.writeLock().unlock();
2094                }
2095            }
2096    
2097            if (message == null) {
2098                Message msg = loadMessage(messageId);
2099                if (msg != null) {
2100                    message = this.createMessageReference(msg);
2101                }
2102            }
2103    
2104            if (message == null) {
2105                throw new JMSException("Slave broker out of sync with master - Message: "
2106                        + messageDispatchNotification.getMessageId() + " on "
2107                        + messageDispatchNotification.getDestination() + " does not exist among pending("
2108                        + pagedInPendingDispatch.size() + ") for subscription: "
2109                        + messageDispatchNotification.getConsumerId());
2110            }
2111            return message;
2112        }
2113    
2114        /**
2115         * Find a consumer that matches the id in the message dispatch notification
2116         *
2117         * @param messageDispatchNotification
2118         * @return sub or null if the subscription has been removed before dispatch
2119         * @throws JMSException
2120         */
2121        private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
2122                throws JMSException {
2123            Subscription sub = null;
2124            consumersLock.readLock().lock();
2125            try {
2126                for (Subscription s : consumers) {
2127                    if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
2128                        sub = s;
2129                        break;
2130                    }
2131                }
2132            }finally {
2133                consumersLock.readLock().unlock();
2134            }
2135            return sub;
2136        }
2137    
2138        public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) {
2139            if (oldPercentUsage > newPercentUsage) {
2140                asyncWakeup();
2141            }
2142        }
2143    
2144        @Override
2145        protected Logger getLog() {
2146            return LOG;
2147        }
2148    
2149        protected boolean isOptimizeStorage(){
2150            boolean result = false;
2151            if (isDoOptimzeMessageStorage()){
2152                consumersLock.readLock().lock();
2153                try{
2154                    if (consumers.isEmpty()==false){
2155                        result = true;
2156                        for (Subscription s : consumers) {
2157                            if (s.getPrefetchSize()==0){
2158                                result = false;
2159                                break;
2160                            }
2161                            if (s.isSlowConsumer()){
2162                                result = false;
2163                                break;
2164                            }
2165                            if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
2166                                result = false;
2167                                break;
2168                            }
2169                        }
2170                    }
2171                }finally {
2172                    consumersLock.readLock().unlock();
2173                }
2174            }
2175            return result;
2176        }
2177    }