001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.List;
021    import javax.jms.ResourceAllocationException;
022    import org.apache.activemq.advisory.AdvisorySupport;
023    import org.apache.activemq.broker.Broker;
024    import org.apache.activemq.broker.BrokerService;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.ProducerBrokerExchange;
027    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
028    import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQTopic;
031    import org.apache.activemq.command.Message;
032    import org.apache.activemq.command.MessageAck;
033    import org.apache.activemq.command.MessageDispatchNotification;
034    import org.apache.activemq.command.ProducerInfo;
035    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
036    import org.apache.activemq.security.SecurityContext;
037    import org.apache.activemq.state.ProducerState;
038    import org.apache.activemq.store.MessageStore;
039    import org.apache.activemq.thread.Scheduler;
040    import org.apache.activemq.usage.MemoryUsage;
041    import org.apache.activemq.usage.SystemUsage;
042    import org.apache.activemq.usage.Usage;
043    import org.slf4j.Logger;
044    
045    /**
046     *
047     */
048    public abstract class BaseDestination implements Destination {
049        /**
050         * The maximum number of messages to page in to the destination from
051         * persistent storage
052         */
053        public static final int MAX_PAGE_SIZE = 200;
054        public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
055        public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
056        public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
057        public static final int MAX_PRODUCERS_TO_AUDIT = 64;
058        public static final int MAX_AUDIT_DEPTH = 2048;
059    
060        protected final ActiveMQDestination destination;
061        protected final Broker broker;
062        protected final MessageStore store;
063        protected SystemUsage systemUsage;
064        protected MemoryUsage memoryUsage;
065        private boolean producerFlowControl = true;
066        private boolean alwaysRetroactive = false;
067        protected boolean warnOnProducerFlowControl = true;
068        protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
069    
070        private int maxProducersToAudit = 1024;
071        private int maxAuditDepth = 2048;
072        private boolean enableAudit = true;
073        private int maxPageSize = MAX_PAGE_SIZE;
074        private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
075        private boolean useCache = true;
076        private int minimumMessageSize = 1024;
077        private boolean lazyDispatch = false;
078        private boolean advisoryForSlowConsumers;
079        private boolean advisoryForFastProducers;
080        private boolean advisoryForDiscardingMessages;
081        private boolean advisoryWhenFull;
082        private boolean advisoryForDelivery;
083        private boolean advisoryForConsumed;
084        private boolean sendAdvisoryIfNoConsumers;
085        protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
086        protected final BrokerService brokerService;
087        protected final Broker regionBroker;
088        protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
089        protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
090        private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
091        protected int cursorMemoryHighWaterMark = 70;
092        protected int storeUsageHighWaterMark = 100;
093        private SlowConsumerStrategy slowConsumerStrategy;
094        private boolean prioritizedMessages;
095        private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
096        private boolean gcIfInactive;
097        private boolean gcWithNetworkConsumers;
098        private long lastActiveTime=0l;
099        private boolean reduceMemoryFootprint = false;
100        protected final Scheduler scheduler;
101        private boolean disposed = false;
102        private boolean doOptimzeMessageStorage = true;
103        /*
104         * percentage of in-flight messages above which optimize message store is disabled
105         */
106        private int optimizeMessageStoreInFlightLimit = 10;
107    
108        /**
109         * @param brokerService
110         * @param store
111         * @param destination
112         * @param parentStats
113         * @throws Exception
114         */
115        public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
116            this.brokerService = brokerService;
117            this.broker = brokerService.getBroker();
118            this.store = store;
119            this.destination = destination;
120            // let's copy the enabled property from the parent DestinationStatistics
121            this.destinationStatistics.setEnabled(parentStats.isEnabled());
122            this.destinationStatistics.setParent(parentStats);
123            this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
124            this.memoryUsage = this.systemUsage.getMemoryUsage();
125            this.memoryUsage.setUsagePortion(1.0f);
126            this.regionBroker = brokerService.getRegionBroker();
127            this.scheduler = brokerService.getBroker().getScheduler();
128        }
129    
130        /**
131         * initialize the destination
132         *
133         * @throws Exception
134         */
135        public void initialize() throws Exception {
136            // Let the store know what usage manager we are using so that he can
137            // flush messages to disk when usage gets high.
138            if (store != null) {
139                store.setMemoryUsage(this.memoryUsage);
140            }
141        }
142    
143        /**
144         * @return the producerFlowControl
145         */
146        public boolean isProducerFlowControl() {
147            return producerFlowControl;
148        }
149    
150        /**
151         * @param producerFlowControl the producerFlowControl to set
152         */
153        public void setProducerFlowControl(boolean producerFlowControl) {
154            this.producerFlowControl = producerFlowControl;
155        }
156    
157        public boolean isAlwaysRetroactive() {
158            return alwaysRetroactive;
159        }
160    
161        public void setAlwaysRetroactive(boolean alwaysRetroactive) {
162            this.alwaysRetroactive = alwaysRetroactive;
163        }
164    
165        /**
166         * Set's the interval at which warnings about producers being blocked by
167         * resource usage will be triggered. Values of 0 or less will disable
168         * warnings
169         *
170         * @param blockedProducerWarningInterval the interval at which warning about
171         *            blocked producers will be triggered.
172         */
173        public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
174            this.blockedProducerWarningInterval = blockedProducerWarningInterval;
175        }
176    
177        /**
178         *
179         * @return the interval at which warning about blocked producers will be
180         *         triggered.
181         */
182        public long getBlockedProducerWarningInterval() {
183            return blockedProducerWarningInterval;
184        }
185    
186        /**
187         * @return the maxProducersToAudit
188         */
189        public int getMaxProducersToAudit() {
190            return maxProducersToAudit;
191        }
192    
193        /**
194         * @param maxProducersToAudit the maxProducersToAudit to set
195         */
196        public void setMaxProducersToAudit(int maxProducersToAudit) {
197            this.maxProducersToAudit = maxProducersToAudit;
198        }
199    
200        /**
201         * @return the maxAuditDepth
202         */
203        public int getMaxAuditDepth() {
204            return maxAuditDepth;
205        }
206    
207        /**
208         * @param maxAuditDepth the maxAuditDepth to set
209         */
210        public void setMaxAuditDepth(int maxAuditDepth) {
211            this.maxAuditDepth = maxAuditDepth;
212        }
213    
214        /**
215         * @return the enableAudit
216         */
217        public boolean isEnableAudit() {
218            return enableAudit;
219        }
220    
221        /**
222         * @param enableAudit the enableAudit to set
223         */
224        public void setEnableAudit(boolean enableAudit) {
225            this.enableAudit = enableAudit;
226        }
227    
228        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
229            destinationStatistics.getProducers().increment();
230            this.lastActiveTime=0l;
231        }
232    
233        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
234            destinationStatistics.getProducers().decrement();
235        }
236    
237        public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
238            destinationStatistics.getConsumers().increment();
239            this.lastActiveTime=0l;
240        }
241    
242        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
243            destinationStatistics.getConsumers().decrement();
244        }
245    
246    
247        public final MemoryUsage getMemoryUsage() {
248            return memoryUsage;
249        }
250    
251        public DestinationStatistics getDestinationStatistics() {
252            return destinationStatistics;
253        }
254    
255        public ActiveMQDestination getActiveMQDestination() {
256            return destination;
257        }
258    
259        public final String getName() {
260            return getActiveMQDestination().getPhysicalName();
261        }
262    
263        public final MessageStore getMessageStore() {
264            return store;
265        }
266    
267        public boolean isActive() {
268            boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
269                               destinationStatistics.getProducers().getCount() != 0;
270            if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
271                isActive = hasRegularConsumers(getConsumers());
272            }
273            return isActive;
274        }
275    
276        public int getMaxPageSize() {
277            return maxPageSize;
278        }
279    
280        public void setMaxPageSize(int maxPageSize) {
281            this.maxPageSize = maxPageSize;
282        }
283    
284        public int getMaxBrowsePageSize() {
285            return this.maxBrowsePageSize;
286        }
287    
288        public void setMaxBrowsePageSize(int maxPageSize) {
289            this.maxBrowsePageSize = maxPageSize;
290        }
291    
292        public int getMaxExpirePageSize() {
293            return this.maxExpirePageSize;
294        }
295    
296        public void setMaxExpirePageSize(int maxPageSize) {
297            this.maxExpirePageSize = maxPageSize;
298        }
299    
300        public void setExpireMessagesPeriod(long expireMessagesPeriod) {
301            this.expireMessagesPeriod = expireMessagesPeriod;
302        }
303    
304        public long getExpireMessagesPeriod() {
305            return expireMessagesPeriod;
306        }
307    
308        public boolean isUseCache() {
309            return useCache;
310        }
311    
312        public void setUseCache(boolean useCache) {
313            this.useCache = useCache;
314        }
315    
316        public int getMinimumMessageSize() {
317            return minimumMessageSize;
318        }
319    
320        public void setMinimumMessageSize(int minimumMessageSize) {
321            this.minimumMessageSize = minimumMessageSize;
322        }
323    
324        public boolean isLazyDispatch() {
325            return lazyDispatch;
326        }
327    
328        public void setLazyDispatch(boolean lazyDispatch) {
329            this.lazyDispatch = lazyDispatch;
330        }
331    
332        protected long getDestinationSequenceId() {
333            return regionBroker.getBrokerSequenceId();
334        }
335    
336        /**
337         * @return the advisoryForSlowConsumers
338         */
339        public boolean isAdvisoryForSlowConsumers() {
340            return advisoryForSlowConsumers;
341        }
342    
343        /**
344         * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
345         */
346        public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
347            this.advisoryForSlowConsumers = advisoryForSlowConsumers;
348        }
349    
350        /**
351         * @return the advisoryForDiscardingMessages
352         */
353        public boolean isAdvisoryForDiscardingMessages() {
354            return advisoryForDiscardingMessages;
355        }
356    
357        /**
358         * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
359         *            set
360         */
361        public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
362            this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
363        }
364    
365        /**
366         * @return the advisoryWhenFull
367         */
368        public boolean isAdvisoryWhenFull() {
369            return advisoryWhenFull;
370        }
371    
372        /**
373         * @param advisoryWhenFull the advisoryWhenFull to set
374         */
375        public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
376            this.advisoryWhenFull = advisoryWhenFull;
377        }
378    
379        /**
380         * @return the advisoryForDelivery
381         */
382        public boolean isAdvisoryForDelivery() {
383            return advisoryForDelivery;
384        }
385    
386        /**
387         * @param advisoryForDelivery the advisoryForDelivery to set
388         */
389        public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
390            this.advisoryForDelivery = advisoryForDelivery;
391        }
392    
393        /**
394         * @return the advisoryForConsumed
395         */
396        public boolean isAdvisoryForConsumed() {
397            return advisoryForConsumed;
398        }
399    
400        /**
401         * @param advisoryForConsumed the advisoryForConsumed to set
402         */
403        public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
404            this.advisoryForConsumed = advisoryForConsumed;
405        }
406    
407        /**
408         * @return the advisdoryForFastProducers
409         */
410        public boolean isAdvisoryForFastProducers() {
411            return advisoryForFastProducers;
412        }
413    
414        /**
415         * @param advisoryForFastProducers the advisdoryForFastProducers to set
416         */
417        public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
418            this.advisoryForFastProducers = advisoryForFastProducers;
419        }
420    
421        public boolean isSendAdvisoryIfNoConsumers() {
422            return sendAdvisoryIfNoConsumers;
423        }
424    
425        public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
426            this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
427        }
428    
429        /**
430         * @return the dead letter strategy
431         */
432        public DeadLetterStrategy getDeadLetterStrategy() {
433            return deadLetterStrategy;
434        }
435    
436        /**
437         * set the dead letter strategy
438         *
439         * @param deadLetterStrategy
440         */
441        public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
442            this.deadLetterStrategy = deadLetterStrategy;
443        }
444    
445        public int getCursorMemoryHighWaterMark() {
446            return this.cursorMemoryHighWaterMark;
447        }
448    
449        public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
450            this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
451        }
452    
453        /**
454         * called when message is consumed
455         *
456         * @param context
457         * @param messageReference
458         */
459        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
460            if (advisoryForConsumed) {
461                broker.messageConsumed(context, messageReference);
462            }
463        }
464    
465        /**
466         * Called when message is delivered to the broker
467         *
468         * @param context
469         * @param messageReference
470         */
471        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
472            if (advisoryForDelivery) {
473                broker.messageDelivered(context, messageReference);
474            }
475        }
476    
477        /**
478         * Called when a message is discarded - e.g. running low on memory This will
479         * happen only if the policy is enabled - e.g. non durable topics
480         *
481         * @param context
482         * @param messageReference
483         */
484        public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
485            if (advisoryForDiscardingMessages) {
486                broker.messageDiscarded(context, sub, messageReference);
487            }
488        }
489    
490        /**
491         * Called when there is a slow consumer
492         *
493         * @param context
494         * @param subs
495         */
496        public void slowConsumer(ConnectionContext context, Subscription subs) {
497            if (advisoryForSlowConsumers) {
498                broker.slowConsumer(context, this, subs);
499            }
500            if (slowConsumerStrategy != null) {
501                slowConsumerStrategy.slowConsumer(context, subs);
502            }
503        }
504    
505        /**
506         * Called to notify a producer is too fast
507         *
508         * @param context
509         * @param producerInfo
510         */
511        public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
512            if (advisoryForFastProducers) {
513                broker.fastProducer(context, producerInfo, getActiveMQDestination());
514            }
515        }
516    
517        /**
518         * Called when a Usage reaches a limit
519         *
520         * @param context
521         * @param usage
522         */
523        public void isFull(ConnectionContext context, Usage<?> usage) {
524            if (advisoryWhenFull) {
525                broker.isFull(context, this, usage);
526            }
527        }
528    
529        public void dispose(ConnectionContext context) throws IOException {
530            if (this.store != null) {
531                this.store.removeAllMessages(context);
532                this.store.dispose(context);
533            }
534            this.destinationStatistics.setParent(null);
535            this.memoryUsage.stop();
536            this.disposed = true;
537        }
538    
539        public boolean isDisposed() {
540            return this.disposed;
541        }
542    
543        /**
544         * Provides a hook to allow messages with no consumer to be processed in
545         * some way - such as to send to a dead letter queue or something..
546         */
547        protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
548            if (!msg.isPersistent()) {
549                if (isSendAdvisoryIfNoConsumers()) {
550                    // allow messages with no consumers to be dispatched to a dead
551                    // letter queue
552                    if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
553    
554                        Message message = msg.copy();
555                        // The original destination and transaction id do not get
556                        // filled when the message is first sent,
557                        // it is only populated if the message is routed to another
558                        // destination like the DLQ
559                        if (message.getOriginalDestination() != null) {
560                            message.setOriginalDestination(message.getDestination());
561                        }
562                        if (message.getOriginalTransactionId() != null) {
563                            message.setOriginalTransactionId(message.getTransactionId());
564                        }
565    
566                        ActiveMQTopic advisoryTopic;
567                        if (destination.isQueue()) {
568                            advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
569                        } else {
570                            advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
571                        }
572                        message.setDestination(advisoryTopic);
573                        message.setTransactionId(null);
574    
575                        // Disable flow control for this since since we don't want
576                        // to block.
577                        boolean originalFlowControl = context.isProducerFlowControl();
578                        try {
579                            context.setProducerFlowControl(false);
580                            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
581                            producerExchange.setMutable(false);
582                            producerExchange.setConnectionContext(context);
583                            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
584                            context.getBroker().send(producerExchange, message);
585                        } finally {
586                            context.setProducerFlowControl(originalFlowControl);
587                        }
588    
589                    }
590                }
591            }
592        }
593    
594        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
595        }
596    
597        public final int getStoreUsageHighWaterMark() {
598            return this.storeUsageHighWaterMark;
599        }
600    
601        public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
602            this.storeUsageHighWaterMark = storeUsageHighWaterMark;
603        }
604    
605        protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
606            waitForSpace(context, usage, 100, warning);
607        }
608    
609        protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
610            if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
611                getLog().debug("sendFailIfNoSpace, forcing exception on send, usage:  " + usage + ": " + warning);
612                throw new ResourceAllocationException(warning);
613            }
614            if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
615                if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
616                    getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
617                    throw new ResourceAllocationException(warning);
618                }
619            } else {
620                long start = System.currentTimeMillis();
621                long nextWarn = start;
622                while (!usage.waitForSpace(1000, highWaterMark)) {
623                    if (context.getStopping().get()) {
624                        throw new IOException("Connection closed, send aborted.");
625                    }
626    
627                    long now = System.currentTimeMillis();
628                    if (now >= nextWarn) {
629                        getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)");
630                        nextWarn = now + blockedProducerWarningInterval;
631                    }
632                }
633            }
634        }
635    
636        protected abstract Logger getLog();
637    
638        public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
639            this.slowConsumerStrategy = slowConsumerStrategy;
640        }
641    
642        public SlowConsumerStrategy getSlowConsumerStrategy() {
643            return this.slowConsumerStrategy;
644        }
645    
646    
647        public boolean isPrioritizedMessages() {
648            return this.prioritizedMessages;
649        }
650    
651        public void setPrioritizedMessages(boolean prioritizedMessages) {
652            this.prioritizedMessages = prioritizedMessages;
653            if (store != null) {
654                store.setPrioritizedMessages(prioritizedMessages);
655            }
656        }
657    
658        /**
659         * @return the inactiveTimoutBeforeGC
660         */
661        public long getInactiveTimoutBeforeGC() {
662            return this.inactiveTimoutBeforeGC;
663        }
664    
665        /**
666         * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
667         */
668        public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
669            this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
670        }
671    
672        /**
673         * @return the gcIfInactive
674         */
675        public boolean isGcIfInactive() {
676            return this.gcIfInactive;
677        }
678    
679        /**
680         * @param gcIfInactive the gcIfInactive to set
681         */
682        public void setGcIfInactive(boolean gcIfInactive) {
683            this.gcIfInactive = gcIfInactive;
684        }
685    
686        /**
687         * Indicate if it is ok to gc destinations that have only network consumers
688         * @param gcWithNetworkConsumers
689         */
690        public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
691            this.gcWithNetworkConsumers = gcWithNetworkConsumers;
692        }
693    
694        public boolean isGcWithNetworkConsumers() {
695            return gcWithNetworkConsumers;
696        }
697    
698        public void markForGC(long timeStamp) {
699            if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
700                    && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
701                this.lastActiveTime = timeStamp;
702            }
703        }
704    
705        public boolean canGC() {
706            boolean result = false;
707            if (isGcIfInactive()&& this.lastActiveTime != 0l) {
708                if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) {
709                    result = true;
710                }
711            }
712            return result;
713        }
714    
715        public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
716            this.reduceMemoryFootprint = reduceMemoryFootprint;
717        }
718    
719        protected boolean isReduceMemoryFootprint() {
720            return this.reduceMemoryFootprint;
721        }
722    
723        public boolean isDoOptimzeMessageStorage() {
724            return doOptimzeMessageStorage;
725        }
726    
727        public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
728            this.doOptimzeMessageStorage = doOptimzeMessageStorage;
729        }
730    
731        public int getOptimizeMessageStoreInFlightLimit() {
732            return optimizeMessageStoreInFlightLimit;
733        }
734    
735        public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
736            this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
737        }
738    
739    
740        public abstract List<Subscription> getConsumers();
741    
742        protected boolean hasRegularConsumers(List<Subscription> consumers) {
743            boolean hasRegularConsumers = false;
744            for (Subscription subscription: consumers) {
745                if (!subscription.getConsumerInfo().isNetworkSubscription()) {
746                    hasRegularConsumers = true;
747                    break;
748                }
749            }
750            return hasRegularConsumers;
751        }
752    
753        protected ConnectionContext createConnectionContext() {
754            ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
755            answer.setBroker(this.broker);
756            answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
757            answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
758            return answer;
759        }
760    
761        protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
762            // the original ack may be a ranged ack, but we are trying to delete
763            // a specific
764            // message store here so we need to convert to a non ranged ack.
765            if (ack.getMessageCount() > 0) {
766                // Dup the ack
767                MessageAck a = new MessageAck();
768                ack.copy(a);
769                ack = a;
770                // Convert to non-ranged.
771                ack.setFirstMessageId(node.getMessageId());
772                ack.setLastMessageId(node.getMessageId());
773                ack.setMessageCount(1);
774            }
775            return ack;
776        }
777    
778    }