001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.List;
021    import javax.jms.ResourceAllocationException;
022    import org.apache.activemq.advisory.AdvisorySupport;
023    import org.apache.activemq.broker.Broker;
024    import org.apache.activemq.broker.BrokerService;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.ProducerBrokerExchange;
027    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
028    import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQTopic;
031    import org.apache.activemq.command.Message;
032    import org.apache.activemq.command.MessageAck;
033    import org.apache.activemq.command.MessageDispatchNotification;
034    import org.apache.activemq.command.ProducerInfo;
035    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
036    import org.apache.activemq.security.SecurityContext;
037    import org.apache.activemq.state.ProducerState;
038    import org.apache.activemq.store.MessageStore;
039    import org.apache.activemq.thread.Scheduler;
040    import org.apache.activemq.usage.MemoryUsage;
041    import org.apache.activemq.usage.SystemUsage;
042    import org.apache.activemq.usage.Usage;
043    import org.slf4j.Logger;
044    
045    /**
046     *
047     */
048    public abstract class BaseDestination implements Destination {
049        /**
050         * The maximum number of messages to page in to the destination from
051         * persistent storage
052         */
053        public static final int MAX_PAGE_SIZE = 200;
054        public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
055        public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
056        public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
057        public static final int MAX_PRODUCERS_TO_AUDIT = 64;
058        public static final int MAX_AUDIT_DEPTH = 2048;
059    
060        protected final ActiveMQDestination destination;
061        protected final Broker broker;
062        protected final MessageStore store;
063        protected SystemUsage systemUsage;
064        protected MemoryUsage memoryUsage;
065        private boolean producerFlowControl = true;
066        private boolean alwaysRetroactive = false;
067        protected boolean warnOnProducerFlowControl = true;
068        protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
069    
070        private int maxProducersToAudit = 1024;
071        private int maxAuditDepth = 2048;
072        private boolean enableAudit = true;
073        private int maxPageSize = MAX_PAGE_SIZE;
074        private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
075        private boolean useCache = true;
076        private int minimumMessageSize = 1024;
077        private boolean lazyDispatch = false;
078        private boolean advisoryForSlowConsumers;
079        private boolean advisoryForFastProducers;
080        private boolean advisoryForDiscardingMessages;
081        private boolean advisoryWhenFull;
082        private boolean advisoryForDelivery;
083        private boolean advisoryForConsumed;
084        private boolean sendAdvisoryIfNoConsumers;
085        protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
086        protected final BrokerService brokerService;
087        protected final Broker regionBroker;
088        protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
089        protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
090        private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
091        protected int cursorMemoryHighWaterMark = 70;
092        protected int storeUsageHighWaterMark = 100;
093        private SlowConsumerStrategy slowConsumerStrategy;
094        private boolean prioritizedMessages;
095        private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
096        private boolean gcIfInactive;
097        private boolean gcWithNetworkConsumers;
098        private long lastActiveTime=0l;
099        private boolean reduceMemoryFootprint = false;
100        protected final Scheduler scheduler;
101        private boolean disposed = false;
102        private boolean doOptimzeMessageStorage = true;
103        /*
104         * percentage of in-flight messages above which optimize message store is disabled
105         */
106        private int optimizeMessageStoreInFlightLimit = 10;
107    
108        /**
109         * @param brokerService
110         * @param store
111         * @param destination
112         * @param parentStats
113         * @throws Exception
114         */
115        public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
116            this.brokerService = brokerService;
117            this.broker = brokerService.getBroker();
118            this.store = store;
119            this.destination = destination;
120            // let's copy the enabled property from the parent DestinationStatistics
121            this.destinationStatistics.setEnabled(parentStats.isEnabled());
122            this.destinationStatistics.setParent(parentStats);
123            this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
124            this.memoryUsage = this.systemUsage.getMemoryUsage();
125            this.memoryUsage.setUsagePortion(1.0f);
126            this.regionBroker = brokerService.getRegionBroker();
127            this.scheduler = brokerService.getBroker().getScheduler();
128        }
129    
130        /**
131         * initialize the destination
132         *
133         * @throws Exception
134         */
135        public void initialize() throws Exception {
136            // Let the store know what usage manager we are using so that he can
137            // flush messages to disk when usage gets high.
138            if (store != null) {
139                store.setMemoryUsage(this.memoryUsage);
140            }
141        }
142    
143        /**
144         * @return the producerFlowControl
145         */
146        public boolean isProducerFlowControl() {
147            return producerFlowControl;
148        }
149    
150        /**
151         * @param producerFlowControl the producerFlowControl to set
152         */
153        public void setProducerFlowControl(boolean producerFlowControl) {
154            this.producerFlowControl = producerFlowControl;
155        }
156    
157        public boolean isAlwaysRetroactive() {
158            return alwaysRetroactive;
159        }
160    
161        public void setAlwaysRetroactive(boolean alwaysRetroactive) {
162            this.alwaysRetroactive = alwaysRetroactive;
163        }
164    
165        /**
166         * Set's the interval at which warnings about producers being blocked by
167         * resource usage will be triggered. Values of 0 or less will disable
168         * warnings
169         *
170         * @param blockedProducerWarningInterval the interval at which warning about
171         *            blocked producers will be triggered.
172         */
173        public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
174            this.blockedProducerWarningInterval = blockedProducerWarningInterval;
175        }
176    
177        /**
178         *
179         * @return the interval at which warning about blocked producers will be
180         *         triggered.
181         */
182        public long getBlockedProducerWarningInterval() {
183            return blockedProducerWarningInterval;
184        }
185    
186        /**
187         * @return the maxProducersToAudit
188         */
189        public int getMaxProducersToAudit() {
190            return maxProducersToAudit;
191        }
192    
193        /**
194         * @param maxProducersToAudit the maxProducersToAudit to set
195         */
196        public void setMaxProducersToAudit(int maxProducersToAudit) {
197            this.maxProducersToAudit = maxProducersToAudit;
198        }
199    
200        /**
201         * @return the maxAuditDepth
202         */
203        public int getMaxAuditDepth() {
204            return maxAuditDepth;
205        }
206    
207        /**
208         * @param maxAuditDepth the maxAuditDepth to set
209         */
210        public void setMaxAuditDepth(int maxAuditDepth) {
211            this.maxAuditDepth = maxAuditDepth;
212        }
213    
214        /**
215         * @return the enableAudit
216         */
217        public boolean isEnableAudit() {
218            return enableAudit;
219        }
220    
221        /**
222         * @param enableAudit the enableAudit to set
223         */
224        public void setEnableAudit(boolean enableAudit) {
225            this.enableAudit = enableAudit;
226        }
227    
228        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
229            destinationStatistics.getProducers().increment();
230            this.lastActiveTime=0l;
231        }
232    
233        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
234            destinationStatistics.getProducers().decrement();
235        }
236    
237        public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
238            destinationStatistics.getConsumers().increment();
239            this.lastActiveTime=0l;
240        }
241    
242        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
243            destinationStatistics.getConsumers().decrement();
244        }
245    
246    
247        public final MemoryUsage getMemoryUsage() {
248            return memoryUsage;
249        }
250    
251            public void setMemoryUsage(MemoryUsage memoryUsage) {
252            this.memoryUsage = memoryUsage;
253        }
254    
255        public DestinationStatistics getDestinationStatistics() {
256            return destinationStatistics;
257        }
258    
259        public ActiveMQDestination getActiveMQDestination() {
260            return destination;
261        }
262    
263        public final String getName() {
264            return getActiveMQDestination().getPhysicalName();
265        }
266    
267        public final MessageStore getMessageStore() {
268            return store;
269        }
270    
271        public boolean isActive() {
272            boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
273                               destinationStatistics.getProducers().getCount() != 0;
274            if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
275                isActive = hasRegularConsumers(getConsumers());
276            }
277            return isActive;
278        }
279    
280        public int getMaxPageSize() {
281            return maxPageSize;
282        }
283    
284        public void setMaxPageSize(int maxPageSize) {
285            this.maxPageSize = maxPageSize;
286        }
287    
288        public int getMaxBrowsePageSize() {
289            return this.maxBrowsePageSize;
290        }
291    
292        public void setMaxBrowsePageSize(int maxPageSize) {
293            this.maxBrowsePageSize = maxPageSize;
294        }
295    
296        public int getMaxExpirePageSize() {
297            return this.maxExpirePageSize;
298        }
299    
300        public void setMaxExpirePageSize(int maxPageSize) {
301            this.maxExpirePageSize = maxPageSize;
302        }
303    
304        public void setExpireMessagesPeriod(long expireMessagesPeriod) {
305            this.expireMessagesPeriod = expireMessagesPeriod;
306        }
307    
308        public long getExpireMessagesPeriod() {
309            return expireMessagesPeriod;
310        }
311    
312        public boolean isUseCache() {
313            return useCache;
314        }
315    
316        public void setUseCache(boolean useCache) {
317            this.useCache = useCache;
318        }
319    
320        public int getMinimumMessageSize() {
321            return minimumMessageSize;
322        }
323    
324        public void setMinimumMessageSize(int minimumMessageSize) {
325            this.minimumMessageSize = minimumMessageSize;
326        }
327    
328        public boolean isLazyDispatch() {
329            return lazyDispatch;
330        }
331    
332        public void setLazyDispatch(boolean lazyDispatch) {
333            this.lazyDispatch = lazyDispatch;
334        }
335    
336        protected long getDestinationSequenceId() {
337            return regionBroker.getBrokerSequenceId();
338        }
339    
340        /**
341         * @return the advisoryForSlowConsumers
342         */
343        public boolean isAdvisoryForSlowConsumers() {
344            return advisoryForSlowConsumers;
345        }
346    
347        /**
348         * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
349         */
350        public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
351            this.advisoryForSlowConsumers = advisoryForSlowConsumers;
352        }
353    
354        /**
355         * @return the advisoryForDiscardingMessages
356         */
357        public boolean isAdvisoryForDiscardingMessages() {
358            return advisoryForDiscardingMessages;
359        }
360    
361        /**
362         * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
363         *            set
364         */
365        public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
366            this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
367        }
368    
369        /**
370         * @return the advisoryWhenFull
371         */
372        public boolean isAdvisoryWhenFull() {
373            return advisoryWhenFull;
374        }
375    
376        /**
377         * @param advisoryWhenFull the advisoryWhenFull to set
378         */
379        public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
380            this.advisoryWhenFull = advisoryWhenFull;
381        }
382    
383        /**
384         * @return the advisoryForDelivery
385         */
386        public boolean isAdvisoryForDelivery() {
387            return advisoryForDelivery;
388        }
389    
390        /**
391         * @param advisoryForDelivery the advisoryForDelivery to set
392         */
393        public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
394            this.advisoryForDelivery = advisoryForDelivery;
395        }
396    
397        /**
398         * @return the advisoryForConsumed
399         */
400        public boolean isAdvisoryForConsumed() {
401            return advisoryForConsumed;
402        }
403    
404        /**
405         * @param advisoryForConsumed the advisoryForConsumed to set
406         */
407        public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
408            this.advisoryForConsumed = advisoryForConsumed;
409        }
410    
411        /**
412         * @return the advisdoryForFastProducers
413         */
414        public boolean isAdvisoryForFastProducers() {
415            return advisoryForFastProducers;
416        }
417    
418        /**
419         * @param advisoryForFastProducers the advisdoryForFastProducers to set
420         */
421        public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
422            this.advisoryForFastProducers = advisoryForFastProducers;
423        }
424    
425        public boolean isSendAdvisoryIfNoConsumers() {
426            return sendAdvisoryIfNoConsumers;
427        }
428    
429        public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
430            this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
431        }
432    
433        /**
434         * @return the dead letter strategy
435         */
436        public DeadLetterStrategy getDeadLetterStrategy() {
437            return deadLetterStrategy;
438        }
439    
440        /**
441         * set the dead letter strategy
442         *
443         * @param deadLetterStrategy
444         */
445        public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
446            this.deadLetterStrategy = deadLetterStrategy;
447        }
448    
449        public int getCursorMemoryHighWaterMark() {
450            return this.cursorMemoryHighWaterMark;
451        }
452    
453        public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
454            this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
455        }
456    
457        /**
458         * called when message is consumed
459         *
460         * @param context
461         * @param messageReference
462         */
463        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
464            if (advisoryForConsumed) {
465                broker.messageConsumed(context, messageReference);
466            }
467        }
468    
469        /**
470         * Called when message is delivered to the broker
471         *
472         * @param context
473         * @param messageReference
474         */
475        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
476            if (advisoryForDelivery) {
477                broker.messageDelivered(context, messageReference);
478            }
479        }
480    
481        /**
482         * Called when a message is discarded - e.g. running low on memory This will
483         * happen only if the policy is enabled - e.g. non durable topics
484         *
485         * @param context
486         * @param messageReference
487         */
488        public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
489            if (advisoryForDiscardingMessages) {
490                broker.messageDiscarded(context, sub, messageReference);
491            }
492        }
493    
494        /**
495         * Called when there is a slow consumer
496         *
497         * @param context
498         * @param subs
499         */
500        public void slowConsumer(ConnectionContext context, Subscription subs) {
501            if (advisoryForSlowConsumers) {
502                broker.slowConsumer(context, this, subs);
503            }
504            if (slowConsumerStrategy != null) {
505                slowConsumerStrategy.slowConsumer(context, subs);
506            }
507        }
508    
509        /**
510         * Called to notify a producer is too fast
511         *
512         * @param context
513         * @param producerInfo
514         */
515        public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
516            if (advisoryForFastProducers) {
517                broker.fastProducer(context, producerInfo, getActiveMQDestination());
518            }
519        }
520    
521        /**
522         * Called when a Usage reaches a limit
523         *
524         * @param context
525         * @param usage
526         */
527        public void isFull(ConnectionContext context, Usage<?> usage) {
528            if (advisoryWhenFull) {
529                broker.isFull(context, this, usage);
530            }
531        }
532    
533        public void dispose(ConnectionContext context) throws IOException {
534            if (this.store != null) {
535                this.store.removeAllMessages(context);
536                this.store.dispose(context);
537            }
538            this.destinationStatistics.setParent(null);
539            this.memoryUsage.stop();
540            this.disposed = true;
541        }
542    
543        public boolean isDisposed() {
544            return this.disposed;
545        }
546    
547        /**
548         * Provides a hook to allow messages with no consumer to be processed in
549         * some way - such as to send to a dead letter queue or something..
550         */
551        protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
552            if (!msg.isPersistent()) {
553                if (isSendAdvisoryIfNoConsumers()) {
554                    // allow messages with no consumers to be dispatched to a dead
555                    // letter queue
556                    if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
557    
558                        Message message = msg.copy();
559                        // The original destination and transaction id do not get
560                        // filled when the message is first sent,
561                        // it is only populated if the message is routed to another
562                        // destination like the DLQ
563                        if (message.getOriginalDestination() != null) {
564                            message.setOriginalDestination(message.getDestination());
565                        }
566                        if (message.getOriginalTransactionId() != null) {
567                            message.setOriginalTransactionId(message.getTransactionId());
568                        }
569    
570                        ActiveMQTopic advisoryTopic;
571                        if (destination.isQueue()) {
572                            advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
573                        } else {
574                            advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
575                        }
576                        message.setDestination(advisoryTopic);
577                        message.setTransactionId(null);
578    
579                        // Disable flow control for this since since we don't want
580                        // to block.
581                        boolean originalFlowControl = context.isProducerFlowControl();
582                        try {
583                            context.setProducerFlowControl(false);
584                            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
585                            producerExchange.setMutable(false);
586                            producerExchange.setConnectionContext(context);
587                            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
588                            context.getBroker().send(producerExchange, message);
589                        } finally {
590                            context.setProducerFlowControl(originalFlowControl);
591                        }
592    
593                    }
594                }
595            }
596        }
597    
598        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
599        }
600    
601        public final int getStoreUsageHighWaterMark() {
602            return this.storeUsageHighWaterMark;
603        }
604    
605        public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
606            this.storeUsageHighWaterMark = storeUsageHighWaterMark;
607        }
608    
609        protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
610            waitForSpace(context, usage, 100, warning);
611        }
612    
613        protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
614            if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
615                getLog().debug("sendFailIfNoSpace, forcing exception on send, usage:  " + usage + ": " + warning);
616                throw new ResourceAllocationException(warning);
617            }
618            if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
619                if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
620                    getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
621                    throw new ResourceAllocationException(warning);
622                }
623            } else {
624                long start = System.currentTimeMillis();
625                long nextWarn = start;
626                while (!usage.waitForSpace(1000, highWaterMark)) {
627                    if (context.getStopping().get()) {
628                        throw new IOException("Connection closed, send aborted.");
629                    }
630    
631                    long now = System.currentTimeMillis();
632                    if (now >= nextWarn) {
633                        getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)");
634                        nextWarn = now + blockedProducerWarningInterval;
635                    }
636                }
637            }
638        }
639    
640        protected abstract Logger getLog();
641    
642        public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
643            this.slowConsumerStrategy = slowConsumerStrategy;
644        }
645    
646        public SlowConsumerStrategy getSlowConsumerStrategy() {
647            return this.slowConsumerStrategy;
648        }
649    
650    
651        public boolean isPrioritizedMessages() {
652            return this.prioritizedMessages;
653        }
654    
655        public void setPrioritizedMessages(boolean prioritizedMessages) {
656            this.prioritizedMessages = prioritizedMessages;
657            if (store != null) {
658                store.setPrioritizedMessages(prioritizedMessages);
659            }
660        }
661    
662        /**
663         * @return the inactiveTimoutBeforeGC
664         */
665        public long getInactiveTimoutBeforeGC() {
666            return this.inactiveTimoutBeforeGC;
667        }
668    
669        /**
670         * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
671         */
672        public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
673            this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
674        }
675    
676        /**
677         * @return the gcIfInactive
678         */
679        public boolean isGcIfInactive() {
680            return this.gcIfInactive;
681        }
682    
683        /**
684         * @param gcIfInactive the gcIfInactive to set
685         */
686        public void setGcIfInactive(boolean gcIfInactive) {
687            this.gcIfInactive = gcIfInactive;
688        }
689    
690        /**
691         * Indicate if it is ok to gc destinations that have only network consumers
692         * @param gcWithNetworkConsumers
693         */
694        public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
695            this.gcWithNetworkConsumers = gcWithNetworkConsumers;
696        }
697    
698        public boolean isGcWithNetworkConsumers() {
699            return gcWithNetworkConsumers;
700        }
701    
702        public void markForGC(long timeStamp) {
703            if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
704                    && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
705                this.lastActiveTime = timeStamp;
706            }
707        }
708    
709        public boolean canGC() {
710            boolean result = false;
711            if (isGcIfInactive()&& this.lastActiveTime != 0l) {
712                if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) {
713                    result = true;
714                }
715            }
716            return result;
717        }
718    
719        public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
720            this.reduceMemoryFootprint = reduceMemoryFootprint;
721        }
722    
723        protected boolean isReduceMemoryFootprint() {
724            return this.reduceMemoryFootprint;
725        }
726    
727        public boolean isDoOptimzeMessageStorage() {
728            return doOptimzeMessageStorage;
729        }
730    
731        public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
732            this.doOptimzeMessageStorage = doOptimzeMessageStorage;
733        }
734    
735        public int getOptimizeMessageStoreInFlightLimit() {
736            return optimizeMessageStoreInFlightLimit;
737        }
738    
739        public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
740            this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
741        }
742    
743    
744        public abstract List<Subscription> getConsumers();
745    
746        protected boolean hasRegularConsumers(List<Subscription> consumers) {
747            boolean hasRegularConsumers = false;
748            for (Subscription subscription: consumers) {
749                if (!subscription.getConsumerInfo().isNetworkSubscription()) {
750                    hasRegularConsumers = true;
751                    break;
752                }
753            }
754            return hasRegularConsumers;
755        }
756    
757        protected ConnectionContext createConnectionContext() {
758            ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
759            answer.setBroker(this.broker);
760            answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
761            answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
762            return answer;
763        }
764    
765        protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
766            // the original ack may be a ranged ack, but we are trying to delete
767            // a specific
768            // message store here so we need to convert to a non ranged ack.
769            if (ack.getMessageCount() > 0) {
770                // Dup the ack
771                MessageAck a = new MessageAck();
772                ack.copy(a);
773                ack = a;
774                // Convert to non-ranged.
775                ack.setFirstMessageId(node.getMessageId());
776                ack.setLastMessageId(node.getMessageId());
777                ack.setMessageCount(1);
778            }
779            return ack;
780        }
781    
782    }