001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.List;
021
022import javax.jms.ResourceAllocationException;
023
024import org.apache.activemq.advisory.AdvisorySupport;
025import org.apache.activemq.broker.Broker;
026import org.apache.activemq.broker.BrokerService;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.broker.ProducerBrokerExchange;
029import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
030import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ActiveMQTopic;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.command.MessageDispatchNotification;
036import org.apache.activemq.command.ProducerInfo;
037import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
038import org.apache.activemq.security.SecurityContext;
039import org.apache.activemq.state.ProducerState;
040import org.apache.activemq.store.MessageStore;
041import org.apache.activemq.thread.Scheduler;
042import org.apache.activemq.usage.MemoryUsage;
043import org.apache.activemq.usage.SystemUsage;
044import org.apache.activemq.usage.Usage;
045import org.slf4j.Logger;
046
047/**
048 *
049 */
050public abstract class BaseDestination implements Destination {
051    /**
052     * The maximum number of messages to page in to the destination from
053     * persistent storage
054     */
055    public static final int MAX_PAGE_SIZE = 200;
056    public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
057    public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
058    public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
059    public static final int MAX_PRODUCERS_TO_AUDIT = 64;
060    public static final int MAX_AUDIT_DEPTH = 10000;
061
062    protected final ActiveMQDestination destination;
063    protected final Broker broker;
064    protected final MessageStore store;
065    protected SystemUsage systemUsage;
066    protected MemoryUsage memoryUsage;
067    private boolean producerFlowControl = true;
068    private boolean alwaysRetroactive = false;
069    protected boolean warnOnProducerFlowControl = true;
070    protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
071
072    private int maxProducersToAudit = 1024;
073    private int maxAuditDepth = 2048;
074    private boolean enableAudit = true;
075    private int maxPageSize = MAX_PAGE_SIZE;
076    private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
077    private boolean useCache = true;
078    private int minimumMessageSize = 1024;
079    private boolean lazyDispatch = false;
080    private boolean advisoryForSlowConsumers;
081    private boolean advisoryForFastProducers;
082    private boolean advisoryForDiscardingMessages;
083    private boolean advisoryWhenFull;
084    private boolean advisoryForDelivery;
085    private boolean advisoryForConsumed;
086    private boolean sendAdvisoryIfNoConsumers;
087    private boolean includeBodyForAdvisory;
088    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
089    protected final BrokerService brokerService;
090    protected final Broker regionBroker;
091    protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
092    protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
093    private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
094    protected int cursorMemoryHighWaterMark = 70;
095    protected int storeUsageHighWaterMark = 100;
096    private SlowConsumerStrategy slowConsumerStrategy;
097    private boolean prioritizedMessages;
098    private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
099    private boolean gcIfInactive;
100    private boolean gcWithNetworkConsumers;
101    private long lastActiveTime=0l;
102    private boolean reduceMemoryFootprint = false;
103    protected final Scheduler scheduler;
104    private boolean disposed = false;
105    private boolean doOptimzeMessageStorage = true;
106    /*
107     * percentage of in-flight messages above which optimize message store is disabled
108     */
109    private int optimizeMessageStoreInFlightLimit = 10;
110    private boolean persistJMSRedelivered;
111
112    /**
113     * @param brokerService
114     * @param store
115     * @param destination
116     * @param parentStats
117     * @throws Exception
118     */
119    public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
120        this.brokerService = brokerService;
121        this.broker = brokerService.getBroker();
122        this.store = store;
123        this.destination = destination;
124        // let's copy the enabled property from the parent DestinationStatistics
125        this.destinationStatistics.setEnabled(parentStats.isEnabled());
126        this.destinationStatistics.setParent(parentStats);
127        this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
128        this.memoryUsage = this.systemUsage.getMemoryUsage();
129        this.memoryUsage.setUsagePortion(1.0f);
130        this.regionBroker = brokerService.getRegionBroker();
131        this.scheduler = brokerService.getBroker().getScheduler();
132    }
133
134    /**
135     * initialize the destination
136     *
137     * @throws Exception
138     */
139    public void initialize() throws Exception {
140        // Let the store know what usage manager we are using so that he can
141        // flush messages to disk when usage gets high.
142        if (store != null) {
143            store.setMemoryUsage(this.memoryUsage);
144        }
145    }
146
147    /**
148     * @return the producerFlowControl
149     */
150    @Override
151    public boolean isProducerFlowControl() {
152        return producerFlowControl;
153    }
154
155    /**
156     * @param producerFlowControl the producerFlowControl to set
157     */
158    @Override
159    public void setProducerFlowControl(boolean producerFlowControl) {
160        this.producerFlowControl = producerFlowControl;
161    }
162
163    @Override
164    public boolean isAlwaysRetroactive() {
165        return alwaysRetroactive;
166    }
167
168    @Override
169    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
170        this.alwaysRetroactive = alwaysRetroactive;
171    }
172
173    /**
174     * Set's the interval at which warnings about producers being blocked by
175     * resource usage will be triggered. Values of 0 or less will disable
176     * warnings
177     *
178     * @param blockedProducerWarningInterval the interval at which warning about
179     *            blocked producers will be triggered.
180     */
181    @Override
182    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
183        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
184    }
185
186    /**
187     *
188     * @return the interval at which warning about blocked producers will be
189     *         triggered.
190     */
191    @Override
192    public long getBlockedProducerWarningInterval() {
193        return blockedProducerWarningInterval;
194    }
195
196    /**
197     * @return the maxProducersToAudit
198     */
199    @Override
200    public int getMaxProducersToAudit() {
201        return maxProducersToAudit;
202    }
203
204    /**
205     * @param maxProducersToAudit the maxProducersToAudit to set
206     */
207    @Override
208    public void setMaxProducersToAudit(int maxProducersToAudit) {
209        this.maxProducersToAudit = maxProducersToAudit;
210    }
211
212    /**
213     * @return the maxAuditDepth
214     */
215    @Override
216    public int getMaxAuditDepth() {
217        return maxAuditDepth;
218    }
219
220    /**
221     * @param maxAuditDepth the maxAuditDepth to set
222     */
223    @Override
224    public void setMaxAuditDepth(int maxAuditDepth) {
225        this.maxAuditDepth = maxAuditDepth;
226    }
227
228    /**
229     * @return the enableAudit
230     */
231    @Override
232    public boolean isEnableAudit() {
233        return enableAudit;
234    }
235
236    /**
237     * @param enableAudit the enableAudit to set
238     */
239    @Override
240    public void setEnableAudit(boolean enableAudit) {
241        this.enableAudit = enableAudit;
242    }
243
244    @Override
245    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
246        destinationStatistics.getProducers().increment();
247        this.lastActiveTime=0l;
248    }
249
250    @Override
251    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
252        destinationStatistics.getProducers().decrement();
253    }
254
255    @Override
256    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
257        destinationStatistics.getConsumers().increment();
258        this.lastActiveTime=0l;
259    }
260
261    @Override
262    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
263        destinationStatistics.getConsumers().decrement();
264    }
265
266
267    @Override
268    public final MemoryUsage getMemoryUsage() {
269        return memoryUsage;
270    }
271
272    @Override
273    public void setMemoryUsage(MemoryUsage memoryUsage) {
274        this.memoryUsage = memoryUsage;
275    }
276
277    @Override
278    public DestinationStatistics getDestinationStatistics() {
279        return destinationStatistics;
280    }
281
282    @Override
283    public ActiveMQDestination getActiveMQDestination() {
284        return destination;
285    }
286
287    @Override
288    public final String getName() {
289        return getActiveMQDestination().getPhysicalName();
290    }
291
292    @Override
293    public final MessageStore getMessageStore() {
294        return store;
295    }
296
297    @Override
298    public boolean isActive() {
299        boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
300                           destinationStatistics.getProducers().getCount() != 0;
301        if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
302            isActive = hasRegularConsumers(getConsumers());
303        }
304        return isActive;
305    }
306
307    @Override
308    public int getMaxPageSize() {
309        return maxPageSize;
310    }
311
312    @Override
313    public void setMaxPageSize(int maxPageSize) {
314        this.maxPageSize = maxPageSize;
315    }
316
317    @Override
318    public int getMaxBrowsePageSize() {
319        return this.maxBrowsePageSize;
320    }
321
322    @Override
323    public void setMaxBrowsePageSize(int maxPageSize) {
324        this.maxBrowsePageSize = maxPageSize;
325    }
326
327    public int getMaxExpirePageSize() {
328        return this.maxExpirePageSize;
329    }
330
331    public void setMaxExpirePageSize(int maxPageSize) {
332        this.maxExpirePageSize = maxPageSize;
333    }
334
335    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
336        this.expireMessagesPeriod = expireMessagesPeriod;
337    }
338
339    public long getExpireMessagesPeriod() {
340        return expireMessagesPeriod;
341    }
342
343    @Override
344    public boolean isUseCache() {
345        return useCache;
346    }
347
348    @Override
349    public void setUseCache(boolean useCache) {
350        this.useCache = useCache;
351    }
352
353    @Override
354    public int getMinimumMessageSize() {
355        return minimumMessageSize;
356    }
357
358    @Override
359    public void setMinimumMessageSize(int minimumMessageSize) {
360        this.minimumMessageSize = minimumMessageSize;
361    }
362
363    @Override
364    public boolean isLazyDispatch() {
365        return lazyDispatch;
366    }
367
368    @Override
369    public void setLazyDispatch(boolean lazyDispatch) {
370        this.lazyDispatch = lazyDispatch;
371    }
372
373    protected long getDestinationSequenceId() {
374        return regionBroker.getBrokerSequenceId();
375    }
376
377    /**
378     * @return the advisoryForSlowConsumers
379     */
380    public boolean isAdvisoryForSlowConsumers() {
381        return advisoryForSlowConsumers;
382    }
383
384    /**
385     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
386     */
387    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
388        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
389    }
390
391    /**
392     * @return the advisoryForDiscardingMessages
393     */
394    public boolean isAdvisoryForDiscardingMessages() {
395        return advisoryForDiscardingMessages;
396    }
397
398    /**
399     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
400     *            set
401     */
402    public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
403        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
404    }
405
406    /**
407     * @return the advisoryWhenFull
408     */
409    public boolean isAdvisoryWhenFull() {
410        return advisoryWhenFull;
411    }
412
413    /**
414     * @param advisoryWhenFull the advisoryWhenFull to set
415     */
416    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
417        this.advisoryWhenFull = advisoryWhenFull;
418    }
419
420    /**
421     * @return the advisoryForDelivery
422     */
423    public boolean isAdvisoryForDelivery() {
424        return advisoryForDelivery;
425    }
426
427    /**
428     * @param advisoryForDelivery the advisoryForDelivery to set
429     */
430    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
431        this.advisoryForDelivery = advisoryForDelivery;
432    }
433
434    /**
435     * @return the advisoryForConsumed
436     */
437    public boolean isAdvisoryForConsumed() {
438        return advisoryForConsumed;
439    }
440
441    /**
442     * @param advisoryForConsumed the advisoryForConsumed to set
443     */
444    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
445        this.advisoryForConsumed = advisoryForConsumed;
446    }
447
448    /**
449     * @return the advisdoryForFastProducers
450     */
451    public boolean isAdvisoryForFastProducers() {
452        return advisoryForFastProducers;
453    }
454
455    /**
456     * @param advisoryForFastProducers the advisdoryForFastProducers to set
457     */
458    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
459        this.advisoryForFastProducers = advisoryForFastProducers;
460    }
461
462    public boolean isSendAdvisoryIfNoConsumers() {
463        return sendAdvisoryIfNoConsumers;
464    }
465
466    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
467        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
468    }
469
470    public boolean isIncludeBodyForAdvisory() {
471        return includeBodyForAdvisory;
472    }
473
474    public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
475        this.includeBodyForAdvisory = includeBodyForAdvisory;
476    }
477
478    /**
479     * @return the dead letter strategy
480     */
481    @Override
482    public DeadLetterStrategy getDeadLetterStrategy() {
483        return deadLetterStrategy;
484    }
485
486    /**
487     * set the dead letter strategy
488     *
489     * @param deadLetterStrategy
490     */
491    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
492        this.deadLetterStrategy = deadLetterStrategy;
493    }
494
495    @Override
496    public int getCursorMemoryHighWaterMark() {
497        return this.cursorMemoryHighWaterMark;
498    }
499
500    @Override
501    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
502        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
503    }
504
505    /**
506     * called when message is consumed
507     *
508     * @param context
509     * @param messageReference
510     */
511    @Override
512    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
513        if (advisoryForConsumed) {
514            broker.messageConsumed(context, messageReference);
515        }
516    }
517
518    /**
519     * Called when message is delivered to the broker
520     *
521     * @param context
522     * @param messageReference
523     */
524    @Override
525    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
526        this.lastActiveTime = 0L;
527        if (advisoryForDelivery) {
528            broker.messageDelivered(context, messageReference);
529        }
530    }
531
532    /**
533     * Called when a message is discarded - e.g. running low on memory This will
534     * happen only if the policy is enabled - e.g. non durable topics
535     *
536     * @param context
537     * @param messageReference
538     */
539    @Override
540    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
541        if (advisoryForDiscardingMessages) {
542            broker.messageDiscarded(context, sub, messageReference);
543        }
544    }
545
546    /**
547     * Called when there is a slow consumer
548     *
549     * @param context
550     * @param subs
551     */
552    @Override
553    public void slowConsumer(ConnectionContext context, Subscription subs) {
554        if (advisoryForSlowConsumers) {
555            broker.slowConsumer(context, this, subs);
556        }
557        if (slowConsumerStrategy != null) {
558            slowConsumerStrategy.slowConsumer(context, subs);
559        }
560    }
561
562    /**
563     * Called to notify a producer is too fast
564     *
565     * @param context
566     * @param producerInfo
567     */
568    @Override
569    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
570        if (advisoryForFastProducers) {
571            broker.fastProducer(context, producerInfo, getActiveMQDestination());
572        }
573    }
574
575    /**
576     * Called when a Usage reaches a limit
577     *
578     * @param context
579     * @param usage
580     */
581    @Override
582    public void isFull(ConnectionContext context, Usage<?> usage) {
583        if (advisoryWhenFull) {
584            broker.isFull(context, this, usage);
585        }
586    }
587
588    @Override
589    public void dispose(ConnectionContext context) throws IOException {
590        if (this.store != null) {
591            this.store.removeAllMessages(context);
592            this.store.dispose(context);
593        }
594        this.destinationStatistics.setParent(null);
595        this.memoryUsage.stop();
596        this.disposed = true;
597    }
598
599    @Override
600    public boolean isDisposed() {
601        return this.disposed;
602    }
603
604    /**
605     * Provides a hook to allow messages with no consumer to be processed in
606     * some way - such as to send to a dead letter queue or something..
607     */
608    protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
609        if (!msg.isPersistent()) {
610            if (isSendAdvisoryIfNoConsumers()) {
611                // allow messages with no consumers to be dispatched to a dead
612                // letter queue
613                if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
614
615                    Message message = msg.copy();
616                    // The original destination and transaction id do not get
617                    // filled when the message is first sent,
618                    // it is only populated if the message is routed to another
619                    // destination like the DLQ
620                    if (message.getOriginalDestination() != null) {
621                        message.setOriginalDestination(message.getDestination());
622                    }
623                    if (message.getOriginalTransactionId() != null) {
624                        message.setOriginalTransactionId(message.getTransactionId());
625                    }
626
627                    ActiveMQTopic advisoryTopic;
628                    if (destination.isQueue()) {
629                        advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
630                    } else {
631                        advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
632                    }
633                    message.setDestination(advisoryTopic);
634                    message.setTransactionId(null);
635
636                    // Disable flow control for this since since we don't want
637                    // to block.
638                    boolean originalFlowControl = context.isProducerFlowControl();
639                    try {
640                        context.setProducerFlowControl(false);
641                        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
642                        producerExchange.setMutable(false);
643                        producerExchange.setConnectionContext(context);
644                        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
645                        context.getBroker().send(producerExchange, message);
646                    } finally {
647                        context.setProducerFlowControl(originalFlowControl);
648                    }
649
650                }
651            }
652        }
653    }
654
655    @Override
656    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
657    }
658
659    public final int getStoreUsageHighWaterMark() {
660        return this.storeUsageHighWaterMark;
661    }
662
663    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
664        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
665    }
666
667    protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
668        waitForSpace(context, producerBrokerExchange, usage, 100, warning);
669    }
670
671    protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
672        if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
673            getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning);
674            throw new ResourceAllocationException(warning);
675        }
676        if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
677            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
678                getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning);
679                throw new ResourceAllocationException(warning);
680            }
681        } else {
682            long start = System.currentTimeMillis();
683            long nextWarn = start;
684            producerBrokerExchange.blockingOnFlowControl(true);
685            destinationStatistics.getBlockedSends().increment();
686            while (!usage.waitForSpace(1000, highWaterMark)) {
687                if (context.getStopping().get()) {
688                    throw new IOException("Connection closed, send aborted.");
689                }
690
691                long now = System.currentTimeMillis();
692                if (now >= nextWarn) {
693                    getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((now - start) / 1000))});
694                    nextWarn = now + blockedProducerWarningInterval;
695                }
696            }
697            long finish = System.currentTimeMillis();
698            long totalTimeBlocked = finish - start;
699            destinationStatistics.getBlockedTime().addTime(totalTimeBlocked);
700            producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked);
701            producerBrokerExchange.blockingOnFlowControl(false);
702        }
703    }
704
705    protected abstract Logger getLog();
706
707    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
708        this.slowConsumerStrategy = slowConsumerStrategy;
709    }
710
711    @Override
712    public SlowConsumerStrategy getSlowConsumerStrategy() {
713        return this.slowConsumerStrategy;
714    }
715
716
717    @Override
718    public boolean isPrioritizedMessages() {
719        return this.prioritizedMessages;
720    }
721
722    public void setPrioritizedMessages(boolean prioritizedMessages) {
723        this.prioritizedMessages = prioritizedMessages;
724        if (store != null) {
725            store.setPrioritizedMessages(prioritizedMessages);
726        }
727    }
728
729    /**
730     * @return the inactiveTimeoutBeforeGC
731     */
732    @Override
733    public long getInactiveTimeoutBeforeGC() {
734        return this.inactiveTimeoutBeforeGC;
735    }
736
737    /**
738     * @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set
739     */
740    public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
741        this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
742    }
743
744    /**
745     * @return the gcIfInactive
746     */
747    public boolean isGcIfInactive() {
748        return this.gcIfInactive;
749    }
750
751    /**
752     * @param gcIfInactive the gcIfInactive to set
753     */
754    public void setGcIfInactive(boolean gcIfInactive) {
755        this.gcIfInactive = gcIfInactive;
756    }
757
758    /**
759     * Indicate if it is ok to gc destinations that have only network consumers
760     * @param gcWithNetworkConsumers
761     */
762    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
763        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
764    }
765
766    public boolean isGcWithNetworkConsumers() {
767        return gcWithNetworkConsumers;
768    }
769
770    @Override
771    public void markForGC(long timeStamp) {
772        if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
773                && destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
774            this.lastActiveTime = timeStamp;
775        }
776    }
777
778    @Override
779    public boolean canGC() {
780        boolean result = false;
781        if (isGcIfInactive() && this.lastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) {
782            if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimeoutBeforeGC()) {
783                result = true;
784            }
785        }
786        return result;
787    }
788
789    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
790        this.reduceMemoryFootprint = reduceMemoryFootprint;
791    }
792
793    public boolean isReduceMemoryFootprint() {
794        return this.reduceMemoryFootprint;
795    }
796
797    @Override
798    public boolean isDoOptimzeMessageStorage() {
799        return doOptimzeMessageStorage;
800    }
801
802    @Override
803    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
804        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
805    }
806
807    public int getOptimizeMessageStoreInFlightLimit() {
808        return optimizeMessageStoreInFlightLimit;
809    }
810
811    public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
812        this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
813    }
814
815
816    @Override
817    public abstract List<Subscription> getConsumers();
818
819    protected boolean hasRegularConsumers(List<Subscription> consumers) {
820        boolean hasRegularConsumers = false;
821        for (Subscription subscription: consumers) {
822            if (!subscription.getConsumerInfo().isNetworkSubscription()) {
823                hasRegularConsumers = true;
824                break;
825            }
826        }
827        return hasRegularConsumers;
828    }
829
830    public ConnectionContext createConnectionContext() {
831        ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
832        answer.setBroker(this.broker);
833        answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
834        answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
835        return answer;
836    }
837
838    protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
839        // the original ack may be a ranged ack, but we are trying to delete
840        // a specific
841        // message store here so we need to convert to a non ranged ack.
842        if (ack.getMessageCount() > 0) {
843            // Dup the ack
844            MessageAck a = new MessageAck();
845            ack.copy(a);
846            ack = a;
847            // Convert to non-ranged.
848            ack.setMessageCount(1);
849        }
850        // always use node messageId so we can access entry/data Location
851        ack.setFirstMessageId(node.getMessageId());
852        ack.setLastMessageId(node.getMessageId());
853        return ack;
854    }
855
856    protected boolean isDLQ() {
857        return destination.isDLQ();
858    }
859
860    @Override
861    public void duplicateFromStore(Message message, Subscription durableSub) {
862        ConnectionContext connectionContext = createConnectionContext();
863        getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
864        Throwable cause = new Throwable("duplicate from store for " + destination);
865        message.setRegionDestination(this);
866        broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
867        MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
868        messageAck.setPoisonCause(cause);
869        try {
870            acknowledge(connectionContext, durableSub, messageAck, message);
871        } catch (IOException e) {
872            getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck);
873        }
874    }
875
876    public void setPersistJMSRedelivered(boolean persistJMSRedelivered) {
877        this.persistJMSRedelivered = persistJMSRedelivered;
878    }
879
880    public boolean isPersistJMSRedelivered() {
881        return persistJMSRedelivered;
882    }
883}