001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.List;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import javax.jms.ResourceAllocationException;
024
025import org.apache.activemq.advisory.AdvisorySupport;
026import org.apache.activemq.broker.Broker;
027import org.apache.activemq.broker.BrokerService;
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.broker.ProducerBrokerExchange;
030import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
031import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQTopic;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageDispatchNotification;
037import org.apache.activemq.command.ProducerInfo;
038import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
039import org.apache.activemq.security.SecurityContext;
040import org.apache.activemq.state.ProducerState;
041import org.apache.activemq.store.MessageStore;
042import org.apache.activemq.thread.Scheduler;
043import org.apache.activemq.usage.MemoryUsage;
044import org.apache.activemq.usage.SystemUsage;
045import org.apache.activemq.usage.TempUsage;
046import org.apache.activemq.usage.Usage;
047import org.slf4j.Logger;
048
049/**
050 *
051 */
052public abstract class BaseDestination implements Destination {
053    /**
054     * The maximum number of messages to page in to the destination from
055     * persistent storage
056     */
057    public static final int MAX_PAGE_SIZE = 200;
058    public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
059    public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
060    public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
061    public static final int MAX_PRODUCERS_TO_AUDIT = 64;
062    public static final int MAX_AUDIT_DEPTH = 10000;
063
064    protected final AtomicBoolean started = new AtomicBoolean();
065    protected final ActiveMQDestination destination;
066    protected final Broker broker;
067    protected final MessageStore store;
068    protected SystemUsage systemUsage;
069    protected MemoryUsage memoryUsage;
070    private boolean producerFlowControl = true;
071    private boolean alwaysRetroactive = false;
072    protected long lastBlockedProducerWarnTime = 0l;
073    protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
074
075    private int maxProducersToAudit = 1024;
076    private int maxAuditDepth = 2048;
077    private boolean enableAudit = true;
078    private int maxPageSize = MAX_PAGE_SIZE;
079    private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
080    private boolean useCache = true;
081    private int minimumMessageSize = 1024;
082    private boolean lazyDispatch = false;
083    private boolean advisoryForSlowConsumers;
084    private boolean advisoryForFastProducers;
085    private boolean advisoryForDiscardingMessages;
086    private boolean advisoryWhenFull;
087    private boolean advisoryForDelivery;
088    private boolean advisoryForConsumed;
089    private boolean sendAdvisoryIfNoConsumers;
090    private boolean includeBodyForAdvisory;
091    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
092    protected final BrokerService brokerService;
093    protected final Broker regionBroker;
094    protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
095    protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
096    private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
097    protected int cursorMemoryHighWaterMark = 70;
098    protected int storeUsageHighWaterMark = 100;
099    private SlowConsumerStrategy slowConsumerStrategy;
100    private boolean prioritizedMessages;
101    private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
102    private boolean gcIfInactive;
103    private boolean gcWithNetworkConsumers;
104    private long lastActiveTime=0l;
105    private boolean reduceMemoryFootprint = false;
106    protected final Scheduler scheduler;
107    private boolean disposed = false;
108    private boolean doOptimzeMessageStorage = true;
109    /*
110     * percentage of in-flight messages above which optimize message store is disabled
111     */
112    private int optimizeMessageStoreInFlightLimit = 10;
113    private boolean persistJMSRedelivered;
114
115    /**
116     * @param brokerService
117     * @param store
118     * @param destination
119     * @param parentStats
120     * @throws Exception
121     */
122    public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
123        this.brokerService = brokerService;
124        this.broker = brokerService.getBroker();
125        this.store = store;
126        this.destination = destination;
127        // let's copy the enabled property from the parent DestinationStatistics
128        this.destinationStatistics.setEnabled(parentStats.isEnabled());
129        this.destinationStatistics.setParent(parentStats);
130        this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
131        this.memoryUsage = this.systemUsage.getMemoryUsage();
132        this.memoryUsage.setUsagePortion(1.0f);
133        this.regionBroker = brokerService.getRegionBroker();
134        this.scheduler = brokerService.getBroker().getScheduler();
135    }
136
137    /**
138     * initialize the destination
139     *
140     * @throws Exception
141     */
142    public void initialize() throws Exception {
143        // Let the store know what usage manager we are using so that he can
144        // flush messages to disk when usage gets high.
145        if (store != null) {
146            store.setMemoryUsage(this.memoryUsage);
147        }
148    }
149
150    /**
151     * @return the producerFlowControl
152     */
153    @Override
154    public boolean isProducerFlowControl() {
155        return producerFlowControl;
156    }
157
158    /**
159     * @param producerFlowControl the producerFlowControl to set
160     */
161    @Override
162    public void setProducerFlowControl(boolean producerFlowControl) {
163        this.producerFlowControl = producerFlowControl;
164    }
165
166    @Override
167    public boolean isAlwaysRetroactive() {
168        return alwaysRetroactive;
169    }
170
171    @Override
172    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
173        this.alwaysRetroactive = alwaysRetroactive;
174    }
175
176    /**
177     * Set's the interval at which warnings about producers being blocked by
178     * resource usage will be triggered. Values of 0 or less will disable
179     * warnings
180     *
181     * @param blockedProducerWarningInterval the interval at which warning about
182     *            blocked producers will be triggered.
183     */
184    @Override
185    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
186        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
187    }
188
189    /**
190     *
191     * @return the interval at which warning about blocked producers will be
192     *         triggered.
193     */
194    @Override
195    public long getBlockedProducerWarningInterval() {
196        return blockedProducerWarningInterval;
197    }
198
199    /**
200     * @return the maxProducersToAudit
201     */
202    @Override
203    public int getMaxProducersToAudit() {
204        return maxProducersToAudit;
205    }
206
207    /**
208     * @param maxProducersToAudit the maxProducersToAudit to set
209     */
210    @Override
211    public void setMaxProducersToAudit(int maxProducersToAudit) {
212        this.maxProducersToAudit = maxProducersToAudit;
213    }
214
215    /**
216     * @return the maxAuditDepth
217     */
218    @Override
219    public int getMaxAuditDepth() {
220        return maxAuditDepth;
221    }
222
223    /**
224     * @param maxAuditDepth the maxAuditDepth to set
225     */
226    @Override
227    public void setMaxAuditDepth(int maxAuditDepth) {
228        this.maxAuditDepth = maxAuditDepth;
229    }
230
231    /**
232     * @return the enableAudit
233     */
234    @Override
235    public boolean isEnableAudit() {
236        return enableAudit;
237    }
238
239    /**
240     * @param enableAudit the enableAudit to set
241     */
242    @Override
243    public void setEnableAudit(boolean enableAudit) {
244        this.enableAudit = enableAudit;
245    }
246
247    @Override
248    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
249        destinationStatistics.getProducers().increment();
250        this.lastActiveTime=0l;
251    }
252
253    @Override
254    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
255        destinationStatistics.getProducers().decrement();
256    }
257
258    @Override
259    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
260        destinationStatistics.getConsumers().increment();
261        this.lastActiveTime=0l;
262    }
263
264    @Override
265    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
266        destinationStatistics.getConsumers().decrement();
267        this.lastActiveTime=0l;
268    }
269
270
271    @Override
272    public final MemoryUsage getMemoryUsage() {
273        return memoryUsage;
274    }
275
276    @Override
277    public void setMemoryUsage(MemoryUsage memoryUsage) {
278        this.memoryUsage = memoryUsage;
279    }
280
281    @Override
282    public TempUsage getTempUsage() {
283        return systemUsage.getTempUsage();
284    }
285
286    @Override
287    public DestinationStatistics getDestinationStatistics() {
288        return destinationStatistics;
289    }
290
291    @Override
292    public ActiveMQDestination getActiveMQDestination() {
293        return destination;
294    }
295
296    @Override
297    public final String getName() {
298        return getActiveMQDestination().getPhysicalName();
299    }
300
301    @Override
302    public final MessageStore getMessageStore() {
303        return store;
304    }
305
306    @Override
307    public boolean isActive() {
308        boolean isActive = destinationStatistics.getConsumers().getCount() > 0 ||
309                           destinationStatistics.getProducers().getCount() > 0;
310        if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) {
311            isActive = hasRegularConsumers(getConsumers());
312        }
313        return isActive;
314    }
315
316    @Override
317    public int getMaxPageSize() {
318        return maxPageSize;
319    }
320
321    @Override
322    public void setMaxPageSize(int maxPageSize) {
323        this.maxPageSize = maxPageSize;
324    }
325
326    @Override
327    public int getMaxBrowsePageSize() {
328        return this.maxBrowsePageSize;
329    }
330
331    @Override
332    public void setMaxBrowsePageSize(int maxPageSize) {
333        this.maxBrowsePageSize = maxPageSize;
334    }
335
336    public int getMaxExpirePageSize() {
337        return this.maxExpirePageSize;
338    }
339
340    public void setMaxExpirePageSize(int maxPageSize) {
341        this.maxExpirePageSize = maxPageSize;
342    }
343
344    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
345        this.expireMessagesPeriod = expireMessagesPeriod;
346    }
347
348    public long getExpireMessagesPeriod() {
349        return expireMessagesPeriod;
350    }
351
352    @Override
353    public boolean isUseCache() {
354        return useCache;
355    }
356
357    @Override
358    public void setUseCache(boolean useCache) {
359        this.useCache = useCache;
360    }
361
362    @Override
363    public int getMinimumMessageSize() {
364        return minimumMessageSize;
365    }
366
367    @Override
368    public void setMinimumMessageSize(int minimumMessageSize) {
369        this.minimumMessageSize = minimumMessageSize;
370    }
371
372    @Override
373    public boolean isLazyDispatch() {
374        return lazyDispatch;
375    }
376
377    @Override
378    public void setLazyDispatch(boolean lazyDispatch) {
379        this.lazyDispatch = lazyDispatch;
380    }
381
382    protected long getDestinationSequenceId() {
383        return regionBroker.getBrokerSequenceId();
384    }
385
386    /**
387     * @return the advisoryForSlowConsumers
388     */
389    public boolean isAdvisoryForSlowConsumers() {
390        return advisoryForSlowConsumers;
391    }
392
393    /**
394     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
395     */
396    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
397        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
398    }
399
400    /**
401     * @return the advisoryForDiscardingMessages
402     */
403    public boolean isAdvisoryForDiscardingMessages() {
404        return advisoryForDiscardingMessages;
405    }
406
407    /**
408     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
409     *            set
410     */
411    public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
412        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
413    }
414
415    /**
416     * @return the advisoryWhenFull
417     */
418    public boolean isAdvisoryWhenFull() {
419        return advisoryWhenFull;
420    }
421
422    /**
423     * @param advisoryWhenFull the advisoryWhenFull to set
424     */
425    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
426        this.advisoryWhenFull = advisoryWhenFull;
427    }
428
429    /**
430     * @return the advisoryForDelivery
431     */
432    public boolean isAdvisoryForDelivery() {
433        return advisoryForDelivery;
434    }
435
436    /**
437     * @param advisoryForDelivery the advisoryForDelivery to set
438     */
439    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
440        this.advisoryForDelivery = advisoryForDelivery;
441    }
442
443    /**
444     * @return the advisoryForConsumed
445     */
446    public boolean isAdvisoryForConsumed() {
447        return advisoryForConsumed;
448    }
449
450    /**
451     * @param advisoryForConsumed the advisoryForConsumed to set
452     */
453    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
454        this.advisoryForConsumed = advisoryForConsumed;
455    }
456
457    /**
458     * @return the advisdoryForFastProducers
459     */
460    public boolean isAdvisoryForFastProducers() {
461        return advisoryForFastProducers;
462    }
463
464    /**
465     * @param advisoryForFastProducers the advisdoryForFastProducers to set
466     */
467    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
468        this.advisoryForFastProducers = advisoryForFastProducers;
469    }
470
471    public boolean isSendAdvisoryIfNoConsumers() {
472        return sendAdvisoryIfNoConsumers;
473    }
474
475    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
476        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
477    }
478
479    public boolean isIncludeBodyForAdvisory() {
480        return includeBodyForAdvisory;
481    }
482
483    public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
484        this.includeBodyForAdvisory = includeBodyForAdvisory;
485    }
486
487    /**
488     * @return the dead letter strategy
489     */
490    @Override
491    public DeadLetterStrategy getDeadLetterStrategy() {
492        return deadLetterStrategy;
493    }
494
495    /**
496     * set the dead letter strategy
497     *
498     * @param deadLetterStrategy
499     */
500    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
501        this.deadLetterStrategy = deadLetterStrategy;
502    }
503
504    @Override
505    public int getCursorMemoryHighWaterMark() {
506        return this.cursorMemoryHighWaterMark;
507    }
508
509    @Override
510    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
511        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
512    }
513
514    /**
515     * called when message is consumed
516     *
517     * @param context
518     * @param messageReference
519     */
520    @Override
521    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
522        if (advisoryForConsumed) {
523            broker.messageConsumed(context, messageReference);
524        }
525    }
526
527    /**
528     * Called when message is delivered to the broker
529     *
530     * @param context
531     * @param messageReference
532     */
533    @Override
534    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
535        this.lastActiveTime = 0L;
536        if (advisoryForDelivery) {
537            broker.messageDelivered(context, messageReference);
538        }
539    }
540
541    /**
542     * Called when a message is discarded - e.g. running low on memory This will
543     * happen only if the policy is enabled - e.g. non durable topics
544     *
545     * @param context
546     * @param messageReference
547     */
548    @Override
549    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
550        if (advisoryForDiscardingMessages) {
551            broker.messageDiscarded(context, sub, messageReference);
552        }
553    }
554
555    /**
556     * Called when there is a slow consumer
557     *
558     * @param context
559     * @param subs
560     */
561    @Override
562    public void slowConsumer(ConnectionContext context, Subscription subs) {
563        if (advisoryForSlowConsumers) {
564            broker.slowConsumer(context, this, subs);
565        }
566        if (slowConsumerStrategy != null) {
567            slowConsumerStrategy.slowConsumer(context, subs);
568        }
569    }
570
571    /**
572     * Called to notify a producer is too fast
573     *
574     * @param context
575     * @param producerInfo
576     */
577    @Override
578    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
579        if (advisoryForFastProducers) {
580            broker.fastProducer(context, producerInfo, getActiveMQDestination());
581        }
582    }
583
584    /**
585     * Called when a Usage reaches a limit
586     *
587     * @param context
588     * @param usage
589     */
590    @Override
591    public void isFull(ConnectionContext context, Usage<?> usage) {
592        if (advisoryWhenFull) {
593            broker.isFull(context, this, usage);
594        }
595    }
596
597    @Override
598    public void dispose(ConnectionContext context) throws IOException {
599        if (this.store != null) {
600            this.store.removeAllMessages(context);
601            this.store.dispose(context);
602        }
603        this.destinationStatistics.setParent(null);
604        this.memoryUsage.stop();
605        this.disposed = true;
606    }
607
608    @Override
609    public boolean isDisposed() {
610        return this.disposed;
611    }
612
613    /**
614     * Provides a hook to allow messages with no consumer to be processed in
615     * some way - such as to send to a dead letter queue or something..
616     */
617    protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
618        if (!msg.isPersistent()) {
619            if (isSendAdvisoryIfNoConsumers()) {
620                // allow messages with no consumers to be dispatched to a dead
621                // letter queue
622                if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
623
624                    Message message = msg.copy();
625                    // The original destination and transaction id do not get
626                    // filled when the message is first sent,
627                    // it is only populated if the message is routed to another
628                    // destination like the DLQ
629                    if (message.getOriginalDestination() != null) {
630                        message.setOriginalDestination(message.getDestination());
631                    }
632                    if (message.getOriginalTransactionId() != null) {
633                        message.setOriginalTransactionId(message.getTransactionId());
634                    }
635
636                    ActiveMQTopic advisoryTopic;
637                    if (destination.isQueue()) {
638                        advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
639                    } else {
640                        advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
641                    }
642                    message.setDestination(advisoryTopic);
643                    message.setTransactionId(null);
644
645                    // Disable flow control for this since since we don't want
646                    // to block.
647                    boolean originalFlowControl = context.isProducerFlowControl();
648                    try {
649                        context.setProducerFlowControl(false);
650                        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
651                        producerExchange.setMutable(false);
652                        producerExchange.setConnectionContext(context);
653                        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
654                        context.getBroker().send(producerExchange, message);
655                    } finally {
656                        context.setProducerFlowControl(originalFlowControl);
657                    }
658
659                }
660            }
661        }
662    }
663
664    @Override
665    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
666    }
667
668    public final int getStoreUsageHighWaterMark() {
669        return this.storeUsageHighWaterMark;
670    }
671
672    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
673        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
674    }
675
676    protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
677        waitForSpace(context, producerBrokerExchange, usage, 100, warning);
678    }
679
680    protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
681        if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
682            getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning);
683            throw new ResourceAllocationException(warning);
684        }
685        if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
686            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
687                getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning);
688                throw new ResourceAllocationException(warning);
689            }
690        } else {
691            long start = System.currentTimeMillis();
692            producerBrokerExchange.blockingOnFlowControl(true);
693            destinationStatistics.getBlockedSends().increment();
694            while (!usage.waitForSpace(1000, highWaterMark)) {
695                if (context.getStopping().get()) {
696                    throw new IOException("Connection closed, send aborted.");
697                }
698
699                if (isFlowControlLogRequired()) {
700                    getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))});
701                }
702            }
703            long finish = System.currentTimeMillis();
704            long totalTimeBlocked = finish - start;
705            destinationStatistics.getBlockedTime().addTime(totalTimeBlocked);
706            producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked);
707            producerBrokerExchange.blockingOnFlowControl(false);
708        }
709    }
710
711    protected boolean isFlowControlLogRequired() {
712        boolean answer = false;
713        if (blockedProducerWarningInterval > 0) {
714            long now = System.currentTimeMillis();
715            if (lastBlockedProducerWarnTime + blockedProducerWarningInterval <= now) {
716                lastBlockedProducerWarnTime = now;
717                answer = true;
718            }
719        }
720        return answer;
721    }
722
723    protected abstract Logger getLog();
724
725    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
726        this.slowConsumerStrategy = slowConsumerStrategy;
727    }
728
729    @Override
730    public SlowConsumerStrategy getSlowConsumerStrategy() {
731        return this.slowConsumerStrategy;
732    }
733
734
735    @Override
736    public boolean isPrioritizedMessages() {
737        return this.prioritizedMessages;
738    }
739
740    public void setPrioritizedMessages(boolean prioritizedMessages) {
741        this.prioritizedMessages = prioritizedMessages;
742        if (store != null) {
743            store.setPrioritizedMessages(prioritizedMessages);
744        }
745    }
746
747    /**
748     * @return the inactiveTimeoutBeforeGC
749     */
750    @Override
751    public long getInactiveTimeoutBeforeGC() {
752        return this.inactiveTimeoutBeforeGC;
753    }
754
755    /**
756     * @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set
757     */
758    public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
759        this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
760    }
761
762    /**
763     * @return the gcIfInactive
764     */
765    public boolean isGcIfInactive() {
766        return this.gcIfInactive;
767    }
768
769    /**
770     * @param gcIfInactive the gcIfInactive to set
771     */
772    public void setGcIfInactive(boolean gcIfInactive) {
773        this.gcIfInactive = gcIfInactive;
774    }
775
776    /**
777     * Indicate if it is ok to gc destinations that have only network consumers
778     * @param gcWithNetworkConsumers
779     */
780    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
781        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
782    }
783
784    public boolean isGcWithNetworkConsumers() {
785        return gcWithNetworkConsumers;
786    }
787
788    @Override
789    public void markForGC(long timeStamp) {
790        if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
791                && destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
792            this.lastActiveTime = timeStamp;
793        }
794    }
795
796    @Override
797    public boolean canGC() {
798        boolean result = false;
799        final long currentLastActiveTime = this.lastActiveTime;
800        if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) {
801            if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
802                result = true;
803            }
804        }
805        return result;
806    }
807
808    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
809        this.reduceMemoryFootprint = reduceMemoryFootprint;
810    }
811
812    public boolean isReduceMemoryFootprint() {
813        return this.reduceMemoryFootprint;
814    }
815
816    @Override
817    public boolean isDoOptimzeMessageStorage() {
818        return doOptimzeMessageStorage;
819    }
820
821    @Override
822    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
823        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
824    }
825
826    public int getOptimizeMessageStoreInFlightLimit() {
827        return optimizeMessageStoreInFlightLimit;
828    }
829
830    public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
831        this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
832    }
833
834
835    @Override
836    public abstract List<Subscription> getConsumers();
837
838    protected boolean hasRegularConsumers(List<Subscription> consumers) {
839        boolean hasRegularConsumers = false;
840        for (Subscription subscription: consumers) {
841            if (!subscription.getConsumerInfo().isNetworkSubscription()) {
842                hasRegularConsumers = true;
843                break;
844            }
845        }
846        return hasRegularConsumers;
847    }
848
849    public ConnectionContext createConnectionContext() {
850        ConnectionContext answer = new ConnectionContext();
851        answer.setBroker(this.broker);
852        answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
853        answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
854        return answer;
855    }
856
857    protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
858        // the original ack may be a ranged ack, but we are trying to delete
859        // a specific
860        // message store here so we need to convert to a non ranged ack.
861        if (ack.getMessageCount() > 0) {
862            // Dup the ack
863            MessageAck a = new MessageAck();
864            ack.copy(a);
865            ack = a;
866            // Convert to non-ranged.
867            ack.setMessageCount(1);
868        }
869        // always use node messageId so we can access entry/data Location
870        ack.setFirstMessageId(node.getMessageId());
871        ack.setLastMessageId(node.getMessageId());
872        return ack;
873    }
874
875    protected boolean isDLQ() {
876        return destination.isDLQ();
877    }
878
879    @Override
880    public void duplicateFromStore(Message message, Subscription durableSub) {
881        ConnectionContext connectionContext = createConnectionContext();
882        getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
883        Throwable cause = new Throwable("duplicate from store for " + destination);
884        message.setRegionDestination(this);
885        broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
886        MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
887        messageAck.setPoisonCause(cause);
888        try {
889            acknowledge(connectionContext, durableSub, messageAck, message);
890        } catch (IOException e) {
891            getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck);
892        }
893    }
894
895    public void setPersistJMSRedelivered(boolean persistJMSRedelivered) {
896        this.persistJMSRedelivered = persistJMSRedelivered;
897    }
898
899    public boolean isPersistJMSRedelivered() {
900        return persistJMSRedelivered;
901    }
902
903    public SystemUsage getSystemUsage() {
904        return systemUsage;
905    }
906}