001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.List;
021 import javax.jms.ResourceAllocationException;
022 import org.apache.activemq.advisory.AdvisorySupport;
023 import org.apache.activemq.broker.Broker;
024 import org.apache.activemq.broker.BrokerService;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.broker.ProducerBrokerExchange;
027 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
028 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ActiveMQTopic;
031 import org.apache.activemq.command.Message;
032 import org.apache.activemq.command.MessageAck;
033 import org.apache.activemq.command.MessageDispatchNotification;
034 import org.apache.activemq.command.ProducerInfo;
035 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
036 import org.apache.activemq.security.SecurityContext;
037 import org.apache.activemq.state.ProducerState;
038 import org.apache.activemq.store.MessageStore;
039 import org.apache.activemq.thread.Scheduler;
040 import org.apache.activemq.usage.MemoryUsage;
041 import org.apache.activemq.usage.SystemUsage;
042 import org.apache.activemq.usage.Usage;
043 import org.slf4j.Logger;
044
045 /**
046 *
047 */
048 public abstract class BaseDestination implements Destination {
049 /**
050 * The maximum number of messages to page in to the destination from
051 * persistent storage
052 */
053 public static final int MAX_PAGE_SIZE = 200;
054 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
055 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
056 public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
057 public static final int MAX_PRODUCERS_TO_AUDIT = 64;
058 public static final int MAX_AUDIT_DEPTH = 2048;
059
060 protected final ActiveMQDestination destination;
061 protected final Broker broker;
062 protected final MessageStore store;
063 protected SystemUsage systemUsage;
064 protected MemoryUsage memoryUsage;
065 private boolean producerFlowControl = true;
066 private boolean alwaysRetroactive = false;
067 protected boolean warnOnProducerFlowControl = true;
068 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
069
070 private int maxProducersToAudit = 1024;
071 private int maxAuditDepth = 2048;
072 private boolean enableAudit = true;
073 private int maxPageSize = MAX_PAGE_SIZE;
074 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
075 private boolean useCache = true;
076 private int minimumMessageSize = 1024;
077 private boolean lazyDispatch = false;
078 private boolean advisoryForSlowConsumers;
079 private boolean advisoryForFastProducers;
080 private boolean advisoryForDiscardingMessages;
081 private boolean advisoryWhenFull;
082 private boolean advisoryForDelivery;
083 private boolean advisoryForConsumed;
084 private boolean sendAdvisoryIfNoConsumers;
085 protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
086 protected final BrokerService brokerService;
087 protected final Broker regionBroker;
088 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
089 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
090 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
091 protected int cursorMemoryHighWaterMark = 70;
092 protected int storeUsageHighWaterMark = 100;
093 private SlowConsumerStrategy slowConsumerStrategy;
094 private boolean prioritizedMessages;
095 private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
096 private boolean gcIfInactive;
097 private boolean gcWithNetworkConsumers;
098 private long lastActiveTime=0l;
099 private boolean reduceMemoryFootprint = false;
100 protected final Scheduler scheduler;
101 private boolean disposed = false;
102 private boolean doOptimzeMessageStorage = true;
103 /*
104 * percentage of in-flight messages above which optimize message store is disabled
105 */
106 private int optimizeMessageStoreInFlightLimit = 10;
107
108 /**
109 * @param brokerService
110 * @param store
111 * @param destination
112 * @param parentStats
113 * @throws Exception
114 */
115 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
116 this.brokerService = brokerService;
117 this.broker = brokerService.getBroker();
118 this.store = store;
119 this.destination = destination;
120 // let's copy the enabled property from the parent DestinationStatistics
121 this.destinationStatistics.setEnabled(parentStats.isEnabled());
122 this.destinationStatistics.setParent(parentStats);
123 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
124 this.memoryUsage = this.systemUsage.getMemoryUsage();
125 this.memoryUsage.setUsagePortion(1.0f);
126 this.regionBroker = brokerService.getRegionBroker();
127 this.scheduler = brokerService.getBroker().getScheduler();
128 }
129
130 /**
131 * initialize the destination
132 *
133 * @throws Exception
134 */
135 public void initialize() throws Exception {
136 // Let the store know what usage manager we are using so that he can
137 // flush messages to disk when usage gets high.
138 if (store != null) {
139 store.setMemoryUsage(this.memoryUsage);
140 }
141 }
142
143 /**
144 * @return the producerFlowControl
145 */
146 public boolean isProducerFlowControl() {
147 return producerFlowControl;
148 }
149
150 /**
151 * @param producerFlowControl the producerFlowControl to set
152 */
153 public void setProducerFlowControl(boolean producerFlowControl) {
154 this.producerFlowControl = producerFlowControl;
155 }
156
157 public boolean isAlwaysRetroactive() {
158 return alwaysRetroactive;
159 }
160
161 public void setAlwaysRetroactive(boolean alwaysRetroactive) {
162 this.alwaysRetroactive = alwaysRetroactive;
163 }
164
165 /**
166 * Set's the interval at which warnings about producers being blocked by
167 * resource usage will be triggered. Values of 0 or less will disable
168 * warnings
169 *
170 * @param blockedProducerWarningInterval the interval at which warning about
171 * blocked producers will be triggered.
172 */
173 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
174 this.blockedProducerWarningInterval = blockedProducerWarningInterval;
175 }
176
177 /**
178 *
179 * @return the interval at which warning about blocked producers will be
180 * triggered.
181 */
182 public long getBlockedProducerWarningInterval() {
183 return blockedProducerWarningInterval;
184 }
185
186 /**
187 * @return the maxProducersToAudit
188 */
189 public int getMaxProducersToAudit() {
190 return maxProducersToAudit;
191 }
192
193 /**
194 * @param maxProducersToAudit the maxProducersToAudit to set
195 */
196 public void setMaxProducersToAudit(int maxProducersToAudit) {
197 this.maxProducersToAudit = maxProducersToAudit;
198 }
199
200 /**
201 * @return the maxAuditDepth
202 */
203 public int getMaxAuditDepth() {
204 return maxAuditDepth;
205 }
206
207 /**
208 * @param maxAuditDepth the maxAuditDepth to set
209 */
210 public void setMaxAuditDepth(int maxAuditDepth) {
211 this.maxAuditDepth = maxAuditDepth;
212 }
213
214 /**
215 * @return the enableAudit
216 */
217 public boolean isEnableAudit() {
218 return enableAudit;
219 }
220
221 /**
222 * @param enableAudit the enableAudit to set
223 */
224 public void setEnableAudit(boolean enableAudit) {
225 this.enableAudit = enableAudit;
226 }
227
228 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
229 destinationStatistics.getProducers().increment();
230 this.lastActiveTime=0l;
231 }
232
233 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
234 destinationStatistics.getProducers().decrement();
235 }
236
237 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
238 destinationStatistics.getConsumers().increment();
239 this.lastActiveTime=0l;
240 }
241
242 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
243 destinationStatistics.getConsumers().decrement();
244 }
245
246
247 public final MemoryUsage getMemoryUsage() {
248 return memoryUsage;
249 }
250
251 public DestinationStatistics getDestinationStatistics() {
252 return destinationStatistics;
253 }
254
255 public ActiveMQDestination getActiveMQDestination() {
256 return destination;
257 }
258
259 public final String getName() {
260 return getActiveMQDestination().getPhysicalName();
261 }
262
263 public final MessageStore getMessageStore() {
264 return store;
265 }
266
267 public boolean isActive() {
268 boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
269 destinationStatistics.getProducers().getCount() != 0;
270 if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
271 isActive = hasRegularConsumers(getConsumers());
272 }
273 return isActive;
274 }
275
276 public int getMaxPageSize() {
277 return maxPageSize;
278 }
279
280 public void setMaxPageSize(int maxPageSize) {
281 this.maxPageSize = maxPageSize;
282 }
283
284 public int getMaxBrowsePageSize() {
285 return this.maxBrowsePageSize;
286 }
287
288 public void setMaxBrowsePageSize(int maxPageSize) {
289 this.maxBrowsePageSize = maxPageSize;
290 }
291
292 public int getMaxExpirePageSize() {
293 return this.maxExpirePageSize;
294 }
295
296 public void setMaxExpirePageSize(int maxPageSize) {
297 this.maxExpirePageSize = maxPageSize;
298 }
299
300 public void setExpireMessagesPeriod(long expireMessagesPeriod) {
301 this.expireMessagesPeriod = expireMessagesPeriod;
302 }
303
304 public long getExpireMessagesPeriod() {
305 return expireMessagesPeriod;
306 }
307
308 public boolean isUseCache() {
309 return useCache;
310 }
311
312 public void setUseCache(boolean useCache) {
313 this.useCache = useCache;
314 }
315
316 public int getMinimumMessageSize() {
317 return minimumMessageSize;
318 }
319
320 public void setMinimumMessageSize(int minimumMessageSize) {
321 this.minimumMessageSize = minimumMessageSize;
322 }
323
324 public boolean isLazyDispatch() {
325 return lazyDispatch;
326 }
327
328 public void setLazyDispatch(boolean lazyDispatch) {
329 this.lazyDispatch = lazyDispatch;
330 }
331
332 protected long getDestinationSequenceId() {
333 return regionBroker.getBrokerSequenceId();
334 }
335
336 /**
337 * @return the advisoryForSlowConsumers
338 */
339 public boolean isAdvisoryForSlowConsumers() {
340 return advisoryForSlowConsumers;
341 }
342
343 /**
344 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
345 */
346 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
347 this.advisoryForSlowConsumers = advisoryForSlowConsumers;
348 }
349
350 /**
351 * @return the advisoryForDiscardingMessages
352 */
353 public boolean isAdvisoryForDiscardingMessages() {
354 return advisoryForDiscardingMessages;
355 }
356
357 /**
358 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
359 * set
360 */
361 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
362 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
363 }
364
365 /**
366 * @return the advisoryWhenFull
367 */
368 public boolean isAdvisoryWhenFull() {
369 return advisoryWhenFull;
370 }
371
372 /**
373 * @param advisoryWhenFull the advisoryWhenFull to set
374 */
375 public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
376 this.advisoryWhenFull = advisoryWhenFull;
377 }
378
379 /**
380 * @return the advisoryForDelivery
381 */
382 public boolean isAdvisoryForDelivery() {
383 return advisoryForDelivery;
384 }
385
386 /**
387 * @param advisoryForDelivery the advisoryForDelivery to set
388 */
389 public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
390 this.advisoryForDelivery = advisoryForDelivery;
391 }
392
393 /**
394 * @return the advisoryForConsumed
395 */
396 public boolean isAdvisoryForConsumed() {
397 return advisoryForConsumed;
398 }
399
400 /**
401 * @param advisoryForConsumed the advisoryForConsumed to set
402 */
403 public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
404 this.advisoryForConsumed = advisoryForConsumed;
405 }
406
407 /**
408 * @return the advisdoryForFastProducers
409 */
410 public boolean isAdvisoryForFastProducers() {
411 return advisoryForFastProducers;
412 }
413
414 /**
415 * @param advisoryForFastProducers the advisdoryForFastProducers to set
416 */
417 public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
418 this.advisoryForFastProducers = advisoryForFastProducers;
419 }
420
421 public boolean isSendAdvisoryIfNoConsumers() {
422 return sendAdvisoryIfNoConsumers;
423 }
424
425 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
426 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
427 }
428
429 /**
430 * @return the dead letter strategy
431 */
432 public DeadLetterStrategy getDeadLetterStrategy() {
433 return deadLetterStrategy;
434 }
435
436 /**
437 * set the dead letter strategy
438 *
439 * @param deadLetterStrategy
440 */
441 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
442 this.deadLetterStrategy = deadLetterStrategy;
443 }
444
445 public int getCursorMemoryHighWaterMark() {
446 return this.cursorMemoryHighWaterMark;
447 }
448
449 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
450 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
451 }
452
453 /**
454 * called when message is consumed
455 *
456 * @param context
457 * @param messageReference
458 */
459 public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
460 if (advisoryForConsumed) {
461 broker.messageConsumed(context, messageReference);
462 }
463 }
464
465 /**
466 * Called when message is delivered to the broker
467 *
468 * @param context
469 * @param messageReference
470 */
471 public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
472 if (advisoryForDelivery) {
473 broker.messageDelivered(context, messageReference);
474 }
475 }
476
477 /**
478 * Called when a message is discarded - e.g. running low on memory This will
479 * happen only if the policy is enabled - e.g. non durable topics
480 *
481 * @param context
482 * @param messageReference
483 */
484 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
485 if (advisoryForDiscardingMessages) {
486 broker.messageDiscarded(context, sub, messageReference);
487 }
488 }
489
490 /**
491 * Called when there is a slow consumer
492 *
493 * @param context
494 * @param subs
495 */
496 public void slowConsumer(ConnectionContext context, Subscription subs) {
497 if (advisoryForSlowConsumers) {
498 broker.slowConsumer(context, this, subs);
499 }
500 if (slowConsumerStrategy != null) {
501 slowConsumerStrategy.slowConsumer(context, subs);
502 }
503 }
504
505 /**
506 * Called to notify a producer is too fast
507 *
508 * @param context
509 * @param producerInfo
510 */
511 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
512 if (advisoryForFastProducers) {
513 broker.fastProducer(context, producerInfo, getActiveMQDestination());
514 }
515 }
516
517 /**
518 * Called when a Usage reaches a limit
519 *
520 * @param context
521 * @param usage
522 */
523 public void isFull(ConnectionContext context, Usage<?> usage) {
524 if (advisoryWhenFull) {
525 broker.isFull(context, this, usage);
526 }
527 }
528
529 public void dispose(ConnectionContext context) throws IOException {
530 if (this.store != null) {
531 this.store.removeAllMessages(context);
532 this.store.dispose(context);
533 }
534 this.destinationStatistics.setParent(null);
535 this.memoryUsage.stop();
536 this.disposed = true;
537 }
538
539 public boolean isDisposed() {
540 return this.disposed;
541 }
542
543 /**
544 * Provides a hook to allow messages with no consumer to be processed in
545 * some way - such as to send to a dead letter queue or something..
546 */
547 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
548 if (!msg.isPersistent()) {
549 if (isSendAdvisoryIfNoConsumers()) {
550 // allow messages with no consumers to be dispatched to a dead
551 // letter queue
552 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
553
554 Message message = msg.copy();
555 // The original destination and transaction id do not get
556 // filled when the message is first sent,
557 // it is only populated if the message is routed to another
558 // destination like the DLQ
559 if (message.getOriginalDestination() != null) {
560 message.setOriginalDestination(message.getDestination());
561 }
562 if (message.getOriginalTransactionId() != null) {
563 message.setOriginalTransactionId(message.getTransactionId());
564 }
565
566 ActiveMQTopic advisoryTopic;
567 if (destination.isQueue()) {
568 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
569 } else {
570 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
571 }
572 message.setDestination(advisoryTopic);
573 message.setTransactionId(null);
574
575 // Disable flow control for this since since we don't want
576 // to block.
577 boolean originalFlowControl = context.isProducerFlowControl();
578 try {
579 context.setProducerFlowControl(false);
580 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
581 producerExchange.setMutable(false);
582 producerExchange.setConnectionContext(context);
583 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
584 context.getBroker().send(producerExchange, message);
585 } finally {
586 context.setProducerFlowControl(originalFlowControl);
587 }
588
589 }
590 }
591 }
592 }
593
594 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
595 }
596
597 public final int getStoreUsageHighWaterMark() {
598 return this.storeUsageHighWaterMark;
599 }
600
601 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
602 this.storeUsageHighWaterMark = storeUsageHighWaterMark;
603 }
604
605 protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
606 waitForSpace(context, usage, 100, warning);
607 }
608
609 protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
610 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
611 getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning);
612 throw new ResourceAllocationException(warning);
613 }
614 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
615 if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
616 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
617 throw new ResourceAllocationException(warning);
618 }
619 } else {
620 long start = System.currentTimeMillis();
621 long nextWarn = start;
622 while (!usage.waitForSpace(1000, highWaterMark)) {
623 if (context.getStopping().get()) {
624 throw new IOException("Connection closed, send aborted.");
625 }
626
627 long now = System.currentTimeMillis();
628 if (now >= nextWarn) {
629 getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)");
630 nextWarn = now + blockedProducerWarningInterval;
631 }
632 }
633 }
634 }
635
636 protected abstract Logger getLog();
637
638 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
639 this.slowConsumerStrategy = slowConsumerStrategy;
640 }
641
642 public SlowConsumerStrategy getSlowConsumerStrategy() {
643 return this.slowConsumerStrategy;
644 }
645
646
647 public boolean isPrioritizedMessages() {
648 return this.prioritizedMessages;
649 }
650
651 public void setPrioritizedMessages(boolean prioritizedMessages) {
652 this.prioritizedMessages = prioritizedMessages;
653 if (store != null) {
654 store.setPrioritizedMessages(prioritizedMessages);
655 }
656 }
657
658 /**
659 * @return the inactiveTimoutBeforeGC
660 */
661 public long getInactiveTimoutBeforeGC() {
662 return this.inactiveTimoutBeforeGC;
663 }
664
665 /**
666 * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
667 */
668 public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
669 this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
670 }
671
672 /**
673 * @return the gcIfInactive
674 */
675 public boolean isGcIfInactive() {
676 return this.gcIfInactive;
677 }
678
679 /**
680 * @param gcIfInactive the gcIfInactive to set
681 */
682 public void setGcIfInactive(boolean gcIfInactive) {
683 this.gcIfInactive = gcIfInactive;
684 }
685
686 /**
687 * Indicate if it is ok to gc destinations that have only network consumers
688 * @param gcWithNetworkConsumers
689 */
690 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
691 this.gcWithNetworkConsumers = gcWithNetworkConsumers;
692 }
693
694 public boolean isGcWithNetworkConsumers() {
695 return gcWithNetworkConsumers;
696 }
697
698 public void markForGC(long timeStamp) {
699 if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
700 && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
701 this.lastActiveTime = timeStamp;
702 }
703 }
704
705 public boolean canGC() {
706 boolean result = false;
707 if (isGcIfInactive()&& this.lastActiveTime != 0l) {
708 if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) {
709 result = true;
710 }
711 }
712 return result;
713 }
714
715 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
716 this.reduceMemoryFootprint = reduceMemoryFootprint;
717 }
718
719 protected boolean isReduceMemoryFootprint() {
720 return this.reduceMemoryFootprint;
721 }
722
723 public boolean isDoOptimzeMessageStorage() {
724 return doOptimzeMessageStorage;
725 }
726
727 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
728 this.doOptimzeMessageStorage = doOptimzeMessageStorage;
729 }
730
731 public int getOptimizeMessageStoreInFlightLimit() {
732 return optimizeMessageStoreInFlightLimit;
733 }
734
735 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
736 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
737 }
738
739
740 public abstract List<Subscription> getConsumers();
741
742 protected boolean hasRegularConsumers(List<Subscription> consumers) {
743 boolean hasRegularConsumers = false;
744 for (Subscription subscription: consumers) {
745 if (!subscription.getConsumerInfo().isNetworkSubscription()) {
746 hasRegularConsumers = true;
747 break;
748 }
749 }
750 return hasRegularConsumers;
751 }
752
753 protected ConnectionContext createConnectionContext() {
754 ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
755 answer.setBroker(this.broker);
756 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
757 answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
758 return answer;
759 }
760
761 protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
762 // the original ack may be a ranged ack, but we are trying to delete
763 // a specific
764 // message store here so we need to convert to a non ranged ack.
765 if (ack.getMessageCount() > 0) {
766 // Dup the ack
767 MessageAck a = new MessageAck();
768 ack.copy(a);
769 ack = a;
770 // Convert to non-ranged.
771 ack.setFirstMessageId(node.getMessageId());
772 ack.setLastMessageId(node.getMessageId());
773 ack.setMessageCount(1);
774 }
775 return ack;
776 }
777
778 }