001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.LinkedList;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.concurrent.CancellationException;
025 import java.util.concurrent.ConcurrentHashMap;
026 import java.util.concurrent.CopyOnWriteArrayList;
027 import java.util.concurrent.Future;
028 import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030 import org.apache.activemq.broker.BrokerService;
031 import org.apache.activemq.broker.ConnectionContext;
032 import org.apache.activemq.broker.ProducerBrokerExchange;
033 import org.apache.activemq.broker.region.policy.DispatchPolicy;
034 import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
035 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
036 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
037 import org.apache.activemq.broker.util.InsertionCountList;
038 import org.apache.activemq.command.ActiveMQDestination;
039 import org.apache.activemq.command.ExceptionResponse;
040 import org.apache.activemq.command.Message;
041 import org.apache.activemq.command.MessageAck;
042 import org.apache.activemq.command.MessageId;
043 import org.apache.activemq.command.ProducerAck;
044 import org.apache.activemq.command.ProducerInfo;
045 import org.apache.activemq.command.Response;
046 import org.apache.activemq.command.SubscriptionInfo;
047 import org.apache.activemq.filter.MessageEvaluationContext;
048 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
049 import org.apache.activemq.store.MessageRecoveryListener;
050 import org.apache.activemq.store.TopicMessageStore;
051 import org.apache.activemq.thread.Task;
052 import org.apache.activemq.thread.TaskRunner;
053 import org.apache.activemq.thread.TaskRunnerFactory;
054 import org.apache.activemq.transaction.Synchronization;
055 import org.apache.activemq.util.SubscriptionKey;
056 import org.slf4j.Logger;
057 import org.slf4j.LoggerFactory;
058
059 /**
060 * The Topic is a destination that sends a copy of a message to every active
061 * Subscription registered.
062 */
063 public class Topic extends BaseDestination implements Task {
064 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
065 private final TopicMessageStore topicStore;
066 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
067 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
068 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
069 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
070 private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
071 private final TaskRunner taskRunner;
072 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
073 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
074 public void run() {
075 try {
076 Topic.this.taskRunner.wakeup();
077 } catch (InterruptedException e) {
078 }
079 };
080 };
081
082 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
083 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
084 super(brokerService, store, destination, parentStats);
085 this.topicStore = store;
086 // set default subscription recovery policy
087 subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
088 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
089 }
090
091 @Override
092 public void initialize() throws Exception {
093 super.initialize();
094 if (store != null) {
095 // AMQ-2586: Better to leave this stat at zero than to give the user
096 // misleading metrics.
097 // int messageCount = store.getMessageCount();
098 // destinationStatistics.getMessages().setCount(messageCount);
099 }
100 }
101
102 public List<Subscription> getConsumers() {
103 synchronized (consumers) {
104 return new ArrayList<Subscription>(consumers);
105 }
106 }
107
108 public boolean lock(MessageReference node, LockOwner sub) {
109 return true;
110 }
111
112 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
113 if (!sub.getConsumerInfo().isDurable()) {
114
115 // Do a retroactive recovery if needed.
116 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
117
118 // synchronize with dispatch method so that no new messages are sent
119 // while we are recovering a subscription to avoid out of order messages.
120 dispatchLock.writeLock().lock();
121 try {
122 boolean applyRecovery = false;
123 synchronized (consumers) {
124 if (!consumers.contains(sub)){
125 sub.add(context, this);
126 consumers.add(sub);
127 applyRecovery=true;
128 super.addSubscription(context, sub);
129 }
130 }
131 if (applyRecovery){
132 subscriptionRecoveryPolicy.recover(context, this, sub);
133 }
134 } finally {
135 dispatchLock.writeLock().unlock();
136 }
137
138 } else {
139 synchronized (consumers) {
140 if (!consumers.contains(sub)){
141 sub.add(context, this);
142 consumers.add(sub);
143 super.addSubscription(context, sub);
144 }
145 }
146 }
147 } else {
148 DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
149 super.addSubscription(context, sub);
150 sub.add(context, this);
151 if(dsub.isActive()) {
152 synchronized (consumers) {
153 boolean hasSubscription = false;
154
155 if (consumers.size() == 0) {
156 hasSubscription = false;
157 } else {
158 for (Subscription currentSub : consumers) {
159 if (currentSub.getConsumerInfo().isDurable()) {
160 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
161 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
162 hasSubscription = true;
163 break;
164 }
165 }
166 }
167 }
168
169 if (!hasSubscription) {
170 consumers.add(sub);
171 }
172 }
173 }
174 durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
175 }
176 }
177
178 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
179 if (!sub.getConsumerInfo().isDurable()) {
180 super.removeSubscription(context, sub, lastDeliveredSequenceId);
181 synchronized (consumers) {
182 consumers.remove(sub);
183 }
184 }
185 sub.remove(context, this);
186 }
187
188 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
189 if (topicStore != null) {
190 topicStore.deleteSubscription(key.clientId, key.subscriptionName);
191 DurableTopicSubscription removed = durableSubcribers.remove(key);
192 if (removed != null) {
193 destinationStatistics.getConsumers().decrement();
194 // deactivate and remove
195 removed.deactivate(false);
196 consumers.remove(removed);
197 }
198 }
199 }
200
201 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
202 // synchronize with dispatch method so that no new messages are sent
203 // while we are recovering a subscription to avoid out of order messages.
204 dispatchLock.writeLock().lock();
205 try {
206
207 if (topicStore == null) {
208 return;
209 }
210
211 // Recover the durable subscription.
212 String clientId = subscription.getSubscriptionKey().getClientId();
213 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
214 String selector = subscription.getConsumerInfo().getSelector();
215 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
216 if (info != null) {
217 // Check to see if selector changed.
218 String s1 = info.getSelector();
219 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
220 // Need to delete the subscription
221 topicStore.deleteSubscription(clientId, subscriptionName);
222 info = null;
223 synchronized (consumers) {
224 consumers.remove(subscription);
225 }
226 } else {
227 synchronized (consumers) {
228 if (!consumers.contains(subscription)) {
229 consumers.add(subscription);
230 }
231 }
232 }
233 }
234
235 // Do we need to create the subscription?
236 if (info == null) {
237 info = new SubscriptionInfo();
238 info.setClientId(clientId);
239 info.setSelector(selector);
240 info.setSubscriptionName(subscriptionName);
241 info.setDestination(getActiveMQDestination());
242 // This destination is an actual destination id.
243 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
244 // This destination might be a pattern
245 synchronized (consumers) {
246 consumers.add(subscription);
247 topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
248 }
249 }
250
251 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
252 msgContext.setDestination(destination);
253 if (subscription.isRecoveryRequired()) {
254 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
255 public boolean recoverMessage(Message message) throws Exception {
256 message.setRegionDestination(Topic.this);
257 try {
258 msgContext.setMessageReference(message);
259 if (subscription.matches(message, msgContext)) {
260 subscription.add(message);
261 }
262 } catch (IOException e) {
263 LOG.error("Failed to recover this message " + message);
264 }
265 return true;
266 }
267
268 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
269 throw new RuntimeException("Should not be called.");
270 }
271
272 public boolean hasSpace() {
273 return true;
274 }
275
276 public boolean isDuplicate(MessageId id) {
277 return false;
278 }
279 });
280 }
281 } finally {
282 dispatchLock.writeLock().unlock();
283 }
284 }
285
286 public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
287 synchronized (consumers) {
288 consumers.remove(sub);
289 }
290 sub.remove(context, this);
291 }
292
293 protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
294 if (subscription.getConsumerInfo().isRetroactive()) {
295 subscriptionRecoveryPolicy.recover(context, this, subscription);
296 }
297 }
298
299 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
300 final ConnectionContext context = producerExchange.getConnectionContext();
301
302 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
303 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
304 && !context.isInRecoveryMode();
305
306 // There is delay between the client sending it and it arriving at the
307 // destination.. it may have expired.
308 if (message.isExpired()) {
309 broker.messageExpired(context, message, null);
310 getDestinationStatistics().getExpired().increment();
311 if (sendProducerAck) {
312 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
313 context.getConnection().dispatchAsync(ack);
314 }
315 return;
316 }
317
318 if (memoryUsage.isFull()) {
319 isFull(context, memoryUsage);
320 fastProducer(context, producerInfo);
321
322 if (isProducerFlowControl() && context.isProducerFlowControl()) {
323
324 if (warnOnProducerFlowControl) {
325 warnOnProducerFlowControl = false;
326 LOG.info(memoryUsage + ", Usage Manager memory limit reached for "
327 + getActiveMQDestination().getQualifiedName()
328 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
329 + " See http://activemq.apache.org/producer-flow-control.html for more info");
330 }
331
332 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
333 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
334 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
335 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
336 + " See http://activemq.apache.org/producer-flow-control.html for more info");
337 }
338
339 // We can avoid blocking due to low usage if the producer is sending a sync message or
340 // if it is using a producer window
341 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
342 synchronized (messagesWaitingForSpace) {
343 messagesWaitingForSpace.add(new Runnable() {
344 public void run() {
345 try {
346
347 // While waiting for space to free up... the
348 // message may have expired.
349 if (message.isExpired()) {
350 broker.messageExpired(context, message, null);
351 getDestinationStatistics().getExpired().increment();
352 } else {
353 doMessageSend(producerExchange, message);
354 }
355
356 if (sendProducerAck) {
357 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
358 .getSize());
359 context.getConnection().dispatchAsync(ack);
360 } else {
361 Response response = new Response();
362 response.setCorrelationId(message.getCommandId());
363 context.getConnection().dispatchAsync(response);
364 }
365
366 } catch (Exception e) {
367 if (!sendProducerAck && !context.isInRecoveryMode()) {
368 ExceptionResponse response = new ExceptionResponse(e);
369 response.setCorrelationId(message.getCommandId());
370 context.getConnection().dispatchAsync(response);
371 }
372 }
373
374 }
375 });
376
377 registerCallbackForNotFullNotification();
378 context.setDontSendReponse(true);
379 return;
380 }
381
382 } else {
383 // Producer flow control cannot be used, so we have do the flow control
384 // at the broker by blocking this thread until there is space available.
385
386 if (memoryUsage.isFull()) {
387 if (context.isInTransaction()) {
388
389 int count = 0;
390 while (!memoryUsage.waitForSpace(1000)) {
391 if (context.getStopping().get()) {
392 throw new IOException("Connection closed, send aborted.");
393 }
394 if (count > 2 && context.isInTransaction()) {
395 count = 0;
396 int size = context.getTransaction().size();
397 LOG.warn("Waiting for space to send transacted message - transaction elements = "
398 + size + " need more space to commit. Message = " + message);
399 }
400 }
401 } else {
402 waitForSpace(
403 context,
404 memoryUsage,
405 "Usage Manager Memory Usage limit reached. Stopping producer ("
406 + message.getProducerId()
407 + ") to prevent flooding "
408 + getActiveMQDestination().getQualifiedName()
409 + "."
410 + " See http://activemq.apache.org/producer-flow-control.html for more info");
411 }
412 }
413
414 // The usage manager could have delayed us by the time
415 // we unblock the message could have expired..
416 if (message.isExpired()) {
417 getDestinationStatistics().getExpired().increment();
418 if (LOG.isDebugEnabled()) {
419 LOG.debug("Expired message: " + message);
420 }
421 return;
422 }
423 }
424 }
425 }
426
427 doMessageSend(producerExchange, message);
428 messageDelivered(context, message);
429 if (sendProducerAck) {
430 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
431 context.getConnection().dispatchAsync(ack);
432 }
433 }
434
435 /**
436 * do send the message - this needs to be synchronized to ensure messages
437 * are stored AND dispatched in the right order
438 *
439 * @param producerExchange
440 * @param message
441 * @throws IOException
442 * @throws Exception
443 */
444 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
445 throws IOException, Exception {
446 final ConnectionContext context = producerExchange.getConnectionContext();
447 message.setRegionDestination(this);
448 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
449 Future<Object> result = null;
450
451 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
452 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
453 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
454 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
455 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
456 + " See http://activemq.apache.org/producer-flow-control.html for more info";
457 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
458 throw new javax.jms.ResourceAllocationException(logMessage);
459 }
460
461 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
462 }
463 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
464 }
465
466 message.incrementReferenceCount();
467
468 if (context.isInTransaction()) {
469 context.getTransaction().addSynchronization(new Synchronization() {
470 @Override
471 public void afterCommit() throws Exception {
472 // It could take while before we receive the commit
473 // operation.. by that time the message could have
474 // expired..
475 if (broker.isExpired(message)) {
476 getDestinationStatistics().getExpired().increment();
477 broker.messageExpired(context, message, null);
478 message.decrementReferenceCount();
479 return;
480 }
481 try {
482 dispatch(context, message);
483 } finally {
484 message.decrementReferenceCount();
485 }
486 }
487 });
488
489 } else {
490 try {
491 dispatch(context, message);
492 } finally {
493 message.decrementReferenceCount();
494 }
495 }
496
497 if (result != null && !result.isCancelled()) {
498 try {
499 result.get();
500 } catch (CancellationException e) {
501 // ignore - the task has been cancelled if the message
502 // has already been deleted
503 }
504 }
505 }
506
507 private boolean canOptimizeOutPersistence() {
508 return durableSubcribers.size() == 0;
509 }
510
511 @Override
512 public String toString() {
513 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
514 }
515
516 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
517 final MessageReference node) throws IOException {
518 if (topicStore != null && node.isPersistent()) {
519 DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
520 SubscriptionKey key = dsub.getSubscriptionKey();
521 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
522 convertToNonRangedAck(ack, node));
523 }
524 messageConsumed(context, node);
525 }
526
527 public void gc() {
528 }
529
530 public Message loadMessage(MessageId messageId) throws IOException {
531 return topicStore != null ? topicStore.getMessage(messageId) : null;
532 }
533
534 public void start() throws Exception {
535 this.subscriptionRecoveryPolicy.start();
536 if (memoryUsage != null) {
537 memoryUsage.start();
538 }
539
540 if (getExpireMessagesPeriod() > 0) {
541 scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
542 }
543 }
544
545 public void stop() throws Exception {
546 if (taskRunner != null) {
547 taskRunner.shutdown();
548 }
549 this.subscriptionRecoveryPolicy.stop();
550 if (memoryUsage != null) {
551 memoryUsage.stop();
552 }
553 if (this.topicStore != null) {
554 this.topicStore.stop();
555 }
556
557 scheduler.cancel(expireMessagesTask);
558 }
559
560 public Message[] browse() {
561 final List<Message> result = new ArrayList<Message>();
562 doBrowse(result, getMaxBrowsePageSize());
563 return result.toArray(new Message[result.size()]);
564 }
565
566 private void doBrowse(final List<Message> browseList, final int max) {
567 try {
568 if (topicStore != null) {
569 final List<Message> toExpire = new ArrayList<Message>();
570 topicStore.recover(new MessageRecoveryListener() {
571 public boolean recoverMessage(Message message) throws Exception {
572 if (message.isExpired()) {
573 toExpire.add(message);
574 }
575 browseList.add(message);
576 return true;
577 }
578
579 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
580 return true;
581 }
582
583 public boolean hasSpace() {
584 return browseList.size() < max;
585 }
586
587 public boolean isDuplicate(MessageId id) {
588 return false;
589 }
590 });
591 final ConnectionContext connectionContext = createConnectionContext();
592 for (Message message : toExpire) {
593 for (DurableTopicSubscription sub : durableSubcribers.values()) {
594 if (!sub.isActive()) {
595 messageExpired(connectionContext, sub, message);
596 }
597 }
598 }
599 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
600 if (msgs != null) {
601 for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
602 browseList.add(msgs[i]);
603 }
604 }
605 }
606 } catch (Throwable e) {
607 LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
608 }
609 }
610
611 public boolean iterate() {
612 synchronized (messagesWaitingForSpace) {
613 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
614 Runnable op = messagesWaitingForSpace.removeFirst();
615 op.run();
616 }
617
618 if (!messagesWaitingForSpace.isEmpty()) {
619 registerCallbackForNotFullNotification();
620 }
621 }
622 return false;
623 }
624
625 private void registerCallbackForNotFullNotification() {
626 // If the usage manager is not full, then the task will not
627 // get called..
628 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
629 // so call it directly here.
630 sendMessagesWaitingForSpaceTask.run();
631 }
632 }
633
634 // Properties
635 // -------------------------------------------------------------------------
636
637 public DispatchPolicy getDispatchPolicy() {
638 return dispatchPolicy;
639 }
640
641 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
642 this.dispatchPolicy = dispatchPolicy;
643 }
644
645 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
646 return subscriptionRecoveryPolicy;
647 }
648
649 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
650 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
651 }
652
653 // Implementation methods
654 // -------------------------------------------------------------------------
655
656 public final void wakeup() {
657 }
658
659 protected void dispatch(final ConnectionContext context, Message message) throws Exception {
660 // AMQ-2586: Better to leave this stat at zero than to give the user
661 // misleading metrics.
662 // destinationStatistics.getMessages().increment();
663 destinationStatistics.getEnqueues().increment();
664 MessageEvaluationContext msgContext = null;
665
666 dispatchLock.readLock().lock();
667 try {
668 if (!subscriptionRecoveryPolicy.add(context, message)) {
669 return;
670 }
671 synchronized (consumers) {
672 if (consumers.isEmpty()) {
673 onMessageWithNoConsumers(context, message);
674 return;
675 }
676 }
677 msgContext = context.getMessageEvaluationContext();
678 msgContext.setDestination(destination);
679 msgContext.setMessageReference(message);
680 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
681 onMessageWithNoConsumers(context, message);
682 }
683
684 } finally {
685 dispatchLock.readLock().unlock();
686 if (msgContext != null) {
687 msgContext.clear();
688 }
689 }
690 }
691
692 private final Runnable expireMessagesTask = new Runnable() {
693 public void run() {
694 List<Message> browsedMessages = new InsertionCountList<Message>();
695 doBrowse(browsedMessages, getMaxExpirePageSize());
696 }
697 };
698
699 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
700 broker.messageExpired(context, reference, subs);
701 // AMQ-2586: Better to leave this stat at zero than to give the user
702 // misleading metrics.
703 // destinationStatistics.getMessages().decrement();
704 destinationStatistics.getEnqueues().decrement();
705 destinationStatistics.getExpired().increment();
706 MessageAck ack = new MessageAck();
707 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
708 ack.setDestination(destination);
709 ack.setMessageID(reference.getMessageId());
710 try {
711 if (subs instanceof DurableTopicSubscription) {
712 ((DurableTopicSubscription)subs).removePending(reference);
713 }
714 acknowledge(context, subs, ack, reference);
715 } catch (Exception e) {
716 LOG.error("Failed to remove expired Message from the store ", e);
717 }
718 }
719
720 @Override
721 protected Logger getLog() {
722 return LOG;
723 }
724
725 protected boolean isOptimizeStorage(){
726 boolean result = false;
727
728 if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){
729 result = true;
730 for (DurableTopicSubscription s : durableSubcribers.values()) {
731 if (s.isActive()== false){
732 result = false;
733 break;
734 }
735 if (s.getPrefetchSize()==0){
736 result = false;
737 break;
738 }
739 if (s.isSlowConsumer()){
740 result = false;
741 break;
742 }
743 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
744 result = false;
745 break;
746 }
747 }
748 }
749 return result;
750 }
751
752 /**
753 * force a reread of the store - after transaction recovery completion
754 */
755 public void clearPendingMessages() {
756 dispatchLock.readLock().lock();
757 try {
758 for (DurableTopicSubscription durableTopicSubscription : durableSubcribers.values()) {
759 clearPendingAndDispatch(durableTopicSubscription);
760 }
761 } finally {
762 dispatchLock.readLock().unlock();
763 }
764 }
765
766 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
767 synchronized (durableTopicSubscription.pendingLock) {
768 durableTopicSubscription.pending.clear();
769 try {
770 durableTopicSubscription.dispatchPending();
771 } catch (IOException exception) {
772 LOG.warn("After clear of pending, failed to dispatch to: " +
773 durableTopicSubscription + ", for :" + destination + ", pending: " +
774 durableTopicSubscription.pending, exception);
775 }
776 }
777 }
778
779 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
780 return durableSubcribers;
781 }
782 }