001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicLong; 024 025import javax.jms.JMSException; 026 027import org.apache.activemq.ActiveMQMessageAudit; 028import org.apache.activemq.broker.Broker; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; 031import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 032import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 033import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; 034import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; 035import org.apache.activemq.command.ConsumerControl; 036import org.apache.activemq.command.ConsumerInfo; 037import org.apache.activemq.command.Message; 038import org.apache.activemq.command.MessageAck; 039import org.apache.activemq.command.MessageDispatch; 040import org.apache.activemq.command.MessageDispatchNotification; 041import org.apache.activemq.command.MessageId; 042import org.apache.activemq.command.MessagePull; 043import org.apache.activemq.command.Response; 044import org.apache.activemq.thread.Scheduler; 045import org.apache.activemq.transaction.Synchronization; 046import org.apache.activemq.transport.TransmitCallback; 047import org.apache.activemq.usage.SystemUsage; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051public class TopicSubscription extends AbstractSubscription { 052 053 private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class); 054 private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0); 055 056 protected PendingMessageCursor matched; 057 protected final SystemUsage usageManager; 058 boolean singleDestination = true; 059 Destination destination; 060 private final Scheduler scheduler; 061 062 private int maximumPendingMessages = -1; 063 private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); 064 private int discarded; 065 private final Object matchedListMutex = new Object(); 066 private int memoryUsageHighWaterMark = 95; 067 // allow duplicate suppression in a ring network of brokers 068 protected int maxProducersToAudit = 1024; 069 protected int maxAuditDepth = 1000; 070 protected boolean enableAudit = false; 071 protected ActiveMQMessageAudit audit; 072 protected boolean active = false; 073 protected boolean discarding = false; 074 private boolean useTopicSubscriptionInflightStats = true; 075 076 //Used for inflight message size calculations 077 protected final Object dispatchLock = new Object(); 078 protected final List<DispatchedNode> dispatched = new ArrayList<>(); 079 080 public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { 081 super(broker, context, info); 082 this.usageManager = usageManager; 083 String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; 084 if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) { 085 this.matched = new VMPendingMessageCursor(false); 086 } else { 087 this.matched = new FilePendingMessageCursor(broker,matchedName,false); 088 } 089 090 this.scheduler = broker.getScheduler(); 091 } 092 093 public void init() throws Exception { 094 this.matched.setSystemUsage(usageManager); 095 this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 096 this.matched.start(); 097 if (enableAudit) { 098 audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit); 099 } 100 this.active=true; 101 } 102 103 @Override 104 public void add(MessageReference node) throws Exception { 105 if (isDuplicate(node)) { 106 return; 107 } 108 // Lets use an indirect reference so that we can associate a unique 109 // locator /w the message. 110 node = new IndirectMessageReference(node.getMessage()); 111 getSubscriptionStatistics().getEnqueues().increment(); 112 synchronized (matchedListMutex) { 113 // if this subscriber is already discarding a message, we don't want to add 114 // any more messages to it as those messages can only be advisories generated in the process, 115 // which can trigger the recursive call loop 116 if (discarding) return; 117 118 if (!isFull() && matched.isEmpty()) { 119 // if maximumPendingMessages is set we will only discard messages which 120 // have not been dispatched (i.e. we allow the prefetch buffer to be filled) 121 dispatch(node); 122 setSlowConsumer(false); 123 } else { 124 if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) { 125 // Slow consumers should log and set their state as such. 126 if (!isSlowConsumer()) { 127 LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString()); 128 setSlowConsumer(true); 129 for (Destination dest: destinations) { 130 dest.slowConsumer(getContext(), this); 131 } 132 } 133 } 134 if (maximumPendingMessages != 0) { 135 boolean warnedAboutWait = false; 136 while (active) { 137 while (matched.isFull()) { 138 if (getContext().getStopping().get()) { 139 LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId()); 140 getSubscriptionStatistics().getEnqueues().decrement(); 141 return; 142 } 143 if (!warnedAboutWait) { 144 LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.", 145 new Object[]{ 146 toString(), 147 matched, 148 matched.getSystemUsage().getTempUsage().getPercentUsage(), 149 matched.getSystemUsage().getMemoryUsage().getPercentUsage() 150 }); 151 warnedAboutWait = true; 152 } 153 matchedListMutex.wait(20); 154 } 155 // Temporary storage could be full - so just try to add the message 156 // see https://issues.apache.org/activemq/browse/AMQ-2475 157 if (matched.tryAddMessageLast(node, 10)) { 158 break; 159 } 160 } 161 if (maximumPendingMessages > 0) { 162 // calculate the high water mark from which point we 163 // will eagerly evict expired messages 164 int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark(); 165 if (maximumPendingMessages > 0 && maximumPendingMessages < max) { 166 max = maximumPendingMessages; 167 } 168 if (!matched.isEmpty() && matched.size() > max) { 169 removeExpiredMessages(); 170 } 171 // lets discard old messages as we are a slow consumer 172 while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { 173 int pageInSize = matched.size() - maximumPendingMessages; 174 // only page in a 1000 at a time - else we could blow the memory 175 pageInSize = Math.max(1000, pageInSize); 176 LinkedList<MessageReference> list = null; 177 MessageReference[] oldMessages=null; 178 synchronized(matched){ 179 list = matched.pageInList(pageInSize); 180 oldMessages = messageEvictionStrategy.evictMessages(list); 181 for (MessageReference ref : list) { 182 ref.decrementReferenceCount(); 183 } 184 } 185 int messagesToEvict = 0; 186 if (oldMessages != null){ 187 messagesToEvict = oldMessages.length; 188 for (int i = 0; i < messagesToEvict; i++) { 189 MessageReference oldMessage = oldMessages[i]; 190 discard(oldMessage); 191 } 192 } 193 // lets avoid an infinite loop if we are given a bad eviction strategy 194 // for a bad strategy lets just not evict 195 if (messagesToEvict == 0) { 196 LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{ 197 destination, messageEvictionStrategy, list.size() 198 }); 199 break; 200 } 201 } 202 } 203 dispatchMatched(); 204 } 205 } 206 } 207 } 208 209 private boolean isDuplicate(MessageReference node) { 210 boolean duplicate = false; 211 if (enableAudit && audit != null) { 212 duplicate = audit.isDuplicate(node); 213 if (LOG.isDebugEnabled()) { 214 if (duplicate) { 215 LOG.debug("{}, ignoring duplicate add: {}", this, node.getMessageId()); 216 } 217 } 218 } 219 return duplicate; 220 } 221 222 /** 223 * Discard any expired messages from the matched list. Called from a 224 * synchronized block. 225 * 226 * @throws IOException 227 */ 228 protected void removeExpiredMessages() throws IOException { 229 try { 230 matched.reset(); 231 while (matched.hasNext()) { 232 MessageReference node = matched.next(); 233 node.decrementReferenceCount(); 234 if (node.isExpired()) { 235 matched.remove(); 236 node.decrementReferenceCount(); 237 if (broker.isExpired(node)) { 238 ((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment(); 239 broker.messageExpired(getContext(), node, this); 240 } 241 break; 242 } 243 } 244 } finally { 245 matched.release(); 246 } 247 } 248 249 @Override 250 public void processMessageDispatchNotification(MessageDispatchNotification mdn) { 251 synchronized (matchedListMutex) { 252 try { 253 matched.reset(); 254 while (matched.hasNext()) { 255 MessageReference node = matched.next(); 256 node.decrementReferenceCount(); 257 if (node.getMessageId().equals(mdn.getMessageId())) { 258 synchronized(dispatchLock) { 259 matched.remove(); 260 getSubscriptionStatistics().getDispatched().increment(); 261 if (isUseTopicSubscriptionInflightStats()) { 262 dispatched.add(new DispatchedNode(node)); 263 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 264 } 265 node.decrementReferenceCount(); 266 } 267 break; 268 } 269 } 270 } finally { 271 matched.release(); 272 } 273 } 274 } 275 276 @Override 277 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { 278 super.acknowledge(context, ack); 279 280 if (ack.isStandardAck()) { 281 updateStatsOnAck(context, ack); 282 } else if (ack.isPoisonAck()) { 283 if (ack.isInTransaction()) { 284 throw new JMSException("Poison ack cannot be transacted: " + ack); 285 } 286 updateStatsOnAck(context, ack); 287 contractPrefetchExtension(ack.getMessageCount()); 288 } else if (ack.isIndividualAck()) { 289 updateStatsOnAck(context, ack); 290 if (ack.isInTransaction()) { 291 expandPrefetchExtension(1); 292 } 293 } else if (ack.isExpiredAck()) { 294 updateStatsOnAck(ack); 295 contractPrefetchExtension(ack.getMessageCount()); 296 } else if (ack.isDeliveredAck()) { 297 // Message was delivered but not acknowledged: update pre-fetch counters. 298 expandPrefetchExtension(ack.getMessageCount()); 299 } else if (ack.isRedeliveredAck()) { 300 // No processing for redelivered needed 301 return; 302 } else { 303 throw new JMSException("Invalid acknowledgment: " + ack); 304 } 305 306 dispatchMatched(); 307 } 308 309 private void updateStatsOnAck(final ConnectionContext context, final MessageAck ack) { 310 if (context.isInTransaction()) { 311 context.getTransaction().addSynchronization(new Synchronization() { 312 313 @Override 314 public void afterRollback() { 315 contractPrefetchExtension(ack.getMessageCount()); 316 } 317 318 @Override 319 public void afterCommit() throws Exception { 320 contractPrefetchExtension(ack.getMessageCount()); 321 updateStatsOnAck(ack); 322 dispatchMatched(); 323 } 324 }); 325 } else { 326 updateStatsOnAck(ack); 327 } 328 } 329 330 @Override 331 public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception { 332 333 // The slave should not deliver pull messages. 334 if (getPrefetchSize() == 0) { 335 336 final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount(); 337 prefetchExtension.set(pull.getQuantity()); 338 dispatchMatched(); 339 340 // If there was nothing dispatched.. we may need to setup a timeout. 341 if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) { 342 343 // immediate timeout used by receiveNoWait() 344 if (pull.getTimeout() == -1) { 345 // Send a NULL message to signal nothing pending. 346 dispatch(null); 347 prefetchExtension.set(0); 348 } 349 350 if (pull.getTimeout() > 0) { 351 scheduler.executeAfterDelay(new Runnable() { 352 353 @Override 354 public void run() { 355 pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone()); 356 } 357 }, pull.getTimeout()); 358 } 359 } 360 } 361 return null; 362 } 363 364 /** 365 * Occurs when a pull times out. If nothing has been dispatched since the 366 * timeout was setup, then send the NULL message. 367 */ 368 private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) { 369 synchronized (matchedListMutex) { 370 if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || alwaysSendDone) { 371 try { 372 dispatch(null); 373 } catch (Exception e) { 374 context.getConnection().serviceException(e); 375 } finally { 376 prefetchExtension.set(0); 377 } 378 } 379 } 380 } 381 382 /** 383 * Update the statistics on message ack. 384 * @param ack 385 */ 386 private void updateStatsOnAck(final MessageAck ack) { 387 //Allow disabling inflight stats to save memory usage 388 if (isUseTopicSubscriptionInflightStats()) { 389 synchronized(dispatchLock) { 390 boolean inAckRange = false; 391 List<DispatchedNode> removeList = new ArrayList<>(); 392 for (final DispatchedNode node : dispatched) { 393 MessageId messageId = node.getMessageId(); 394 if (ack.getFirstMessageId() == null 395 || ack.getFirstMessageId().equals(messageId)) { 396 inAckRange = true; 397 } 398 if (inAckRange) { 399 removeList.add(node); 400 if (ack.getLastMessageId().equals(messageId)) { 401 break; 402 } 403 } 404 } 405 406 for (final DispatchedNode node : removeList) { 407 dispatched.remove(node); 408 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 409 410 final Destination destination = node.getDestination(); 411 incrementStatsOnAck(destination, ack, 1); 412 if (!ack.isInTransaction()) { 413 contractPrefetchExtension(1); 414 } 415 } 416 } 417 } else { 418 if (singleDestination && destination != null) { 419 incrementStatsOnAck(destination, ack, ack.getMessageCount()); 420 } 421 if (!ack.isInTransaction()) { 422 contractPrefetchExtension(ack.getMessageCount()); 423 } 424 } 425 } 426 427 private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) { 428 getSubscriptionStatistics().getDequeues().add(count); 429 destination.getDestinationStatistics().getDequeues().add(count); 430 destination.getDestinationStatistics().getInflight().subtract(count); 431 if (info.isNetworkSubscription()) { 432 destination.getDestinationStatistics().getForwards().add(count); 433 } 434 if (ack.isExpiredAck()) { 435 destination.getDestinationStatistics().getExpired().add(count); 436 } 437 } 438 439 @Override 440 public int countBeforeFull() { 441 return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize(); 442 } 443 444 @Override 445 public int getPendingQueueSize() { 446 return matched(); 447 } 448 449 @Override 450 public long getPendingMessageSize() { 451 synchronized (matchedListMutex) { 452 return matched.messageSize(); 453 } 454 } 455 456 @Override 457 public int getDispatchedQueueSize() { 458 return (int)(getSubscriptionStatistics().getDispatched().getCount() - 459 getSubscriptionStatistics().getDequeues().getCount()); 460 } 461 462 public int getMaximumPendingMessages() { 463 return maximumPendingMessages; 464 } 465 466 @Override 467 public long getDispatchedCounter() { 468 return getSubscriptionStatistics().getDispatched().getCount(); 469 } 470 471 @Override 472 public long getEnqueueCounter() { 473 return getSubscriptionStatistics().getEnqueues().getCount(); 474 } 475 476 @Override 477 public long getDequeueCounter() { 478 return getSubscriptionStatistics().getDequeues().getCount(); 479 } 480 481 /** 482 * @return the number of messages discarded due to being a slow consumer 483 */ 484 public int discarded() { 485 synchronized (matchedListMutex) { 486 return discarded; 487 } 488 } 489 490 /** 491 * @return the number of matched messages (messages targeted for the 492 * subscription but not yet able to be dispatched due to the 493 * prefetch buffer being full). 494 */ 495 public int matched() { 496 synchronized (matchedListMutex) { 497 return matched.size(); 498 } 499 } 500 501 /** 502 * Sets the maximum number of pending messages that can be matched against 503 * this consumer before old messages are discarded. 504 */ 505 public void setMaximumPendingMessages(int maximumPendingMessages) { 506 this.maximumPendingMessages = maximumPendingMessages; 507 } 508 509 public MessageEvictionStrategy getMessageEvictionStrategy() { 510 return messageEvictionStrategy; 511 } 512 513 /** 514 * Sets the eviction strategy used to decide which message to evict when the 515 * slow consumer needs to discard messages 516 */ 517 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 518 this.messageEvictionStrategy = messageEvictionStrategy; 519 } 520 521 public int getMaxProducersToAudit() { 522 return maxProducersToAudit; 523 } 524 525 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { 526 this.maxProducersToAudit = maxProducersToAudit; 527 if (audit != null) { 528 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); 529 } 530 } 531 532 public int getMaxAuditDepth() { 533 return maxAuditDepth; 534 } 535 536 public synchronized void setMaxAuditDepth(int maxAuditDepth) { 537 this.maxAuditDepth = maxAuditDepth; 538 if (audit != null) { 539 audit.setAuditDepth(maxAuditDepth); 540 } 541 } 542 543 public boolean isEnableAudit() { 544 return enableAudit; 545 } 546 547 public synchronized void setEnableAudit(boolean enableAudit) { 548 this.enableAudit = enableAudit; 549 if (enableAudit && audit == null) { 550 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 551 } 552 } 553 554 // Implementation methods 555 // ------------------------------------------------------------------------- 556 @Override 557 public boolean isFull() { 558 return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize(); 559 } 560 561 @Override 562 public int getInFlightSize() { 563 return getDispatchedQueueSize(); 564 } 565 566 /** 567 * @return true when 60% or more room is left for dispatching messages 568 */ 569 @Override 570 public boolean isLowWaterMark() { 571 return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4); 572 } 573 574 /** 575 * @return true when 10% or less room is left for dispatching messages 576 */ 577 @Override 578 public boolean isHighWaterMark() { 579 return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9); 580 } 581 582 /** 583 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 584 */ 585 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 586 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 587 } 588 589 /** 590 * @return the memoryUsageHighWaterMark 591 */ 592 public int getMemoryUsageHighWaterMark() { 593 return this.memoryUsageHighWaterMark; 594 } 595 596 /** 597 * @return the usageManager 598 */ 599 public SystemUsage getUsageManager() { 600 return this.usageManager; 601 } 602 603 /** 604 * @return the matched 605 */ 606 public PendingMessageCursor getMatched() { 607 return this.matched; 608 } 609 610 /** 611 * @param matched the matched to set 612 */ 613 public void setMatched(PendingMessageCursor matched) { 614 this.matched = matched; 615 } 616 617 /** 618 * inform the MessageConsumer on the client to change it's prefetch 619 * 620 * @param newPrefetch 621 */ 622 @Override 623 public void updateConsumerPrefetch(int newPrefetch) { 624 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 625 ConsumerControl cc = new ConsumerControl(); 626 cc.setConsumerId(info.getConsumerId()); 627 cc.setPrefetch(newPrefetch); 628 context.getConnection().dispatchAsync(cc); 629 } 630 } 631 632 private void dispatchMatched() throws IOException { 633 synchronized (matchedListMutex) { 634 if (!matched.isEmpty() && !isFull()) { 635 try { 636 matched.reset(); 637 638 while (matched.hasNext() && !isFull()) { 639 MessageReference message = matched.next(); 640 message.decrementReferenceCount(); 641 matched.remove(); 642 // Message may have been sitting in the matched list a while 643 // waiting for the consumer to ak the message. 644 if (message.isExpired()) { 645 discard(message); 646 continue; // just drop it. 647 } 648 dispatch(message); 649 } 650 } finally { 651 matched.release(); 652 } 653 } 654 } 655 } 656 657 private void dispatch(final MessageReference node) throws IOException { 658 Message message = node != null ? node.getMessage() : null; 659 if (node != null) { 660 node.incrementReferenceCount(); 661 } 662 // Make sure we can dispatch a message. 663 MessageDispatch md = new MessageDispatch(); 664 md.setMessage(message); 665 md.setConsumerId(info.getConsumerId()); 666 if (node != null) { 667 md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); 668 synchronized(dispatchLock) { 669 getSubscriptionStatistics().getDispatched().increment(); 670 if (isUseTopicSubscriptionInflightStats()) { 671 dispatched.add(new DispatchedNode(node)); 672 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 673 } 674 } 675 676 // Keep track if this subscription is receiving messages from a single destination. 677 if (singleDestination) { 678 if (destination == null) { 679 destination = (Destination)node.getRegionDestination(); 680 } else { 681 if (destination != node.getRegionDestination()) { 682 singleDestination = false; 683 } 684 } 685 } 686 687 if (getPrefetchSize() == 0) { 688 decrementPrefetchExtension(1); 689 } 690 } 691 692 if (info.isDispatchAsync()) { 693 if (node != null) { 694 md.setTransmitCallback(new TransmitCallback() { 695 696 @Override 697 public void onSuccess() { 698 Destination regionDestination = (Destination) node.getRegionDestination(); 699 regionDestination.getDestinationStatistics().getDispatched().increment(); 700 regionDestination.getDestinationStatistics().getInflight().increment(); 701 node.decrementReferenceCount(); 702 } 703 704 @Override 705 public void onFailure() { 706 Destination regionDestination = (Destination) node.getRegionDestination(); 707 regionDestination.getDestinationStatistics().getDispatched().increment(); 708 regionDestination.getDestinationStatistics().getInflight().increment(); 709 node.decrementReferenceCount(); 710 } 711 }); 712 } 713 context.getConnection().dispatchAsync(md); 714 } else { 715 context.getConnection().dispatchSync(md); 716 if (node != null) { 717 Destination regionDestination = (Destination) node.getRegionDestination(); 718 regionDestination.getDestinationStatistics().getDispatched().increment(); 719 regionDestination.getDestinationStatistics().getInflight().increment(); 720 node.decrementReferenceCount(); 721 } 722 } 723 } 724 725 private void discard(MessageReference message) { 726 discarding = true; 727 try { 728 message.decrementReferenceCount(); 729 matched.remove(message); 730 discarded++; 731 if (destination != null) { 732 destination.getDestinationStatistics().getDequeues().increment(); 733 } 734 LOG.debug("{}, discarding message {}", this, message); 735 Destination dest = (Destination) message.getRegionDestination(); 736 if (dest != null) { 737 dest.messageDiscarded(getContext(), this, message); 738 } 739 broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId())); 740 } finally { 741 discarding = false; 742 } 743 } 744 745 @Override 746 public String toString() { 747 return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" 748 + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get() 749 + ", usePrefetchExtension=" + isUsePrefetchExtension(); 750 } 751 752 @Override 753 public void destroy() { 754 this.active=false; 755 synchronized (matchedListMutex) { 756 try { 757 matched.destroy(); 758 } catch (Exception e) { 759 LOG.warn("Failed to destroy cursor", e); 760 } 761 } 762 setSlowConsumer(false); 763 synchronized(dispatchLock) { 764 dispatched.clear(); 765 } 766 } 767 768 @Override 769 public int getPrefetchSize() { 770 return info.getPrefetchSize(); 771 } 772 773 @Override 774 public void setPrefetchSize(int newSize) { 775 info.setPrefetchSize(newSize); 776 try { 777 dispatchMatched(); 778 } catch(Exception e) { 779 LOG.trace("Caught exception on dispatch after prefetch size change."); 780 } 781 } 782 783 public boolean isUseTopicSubscriptionInflightStats() { 784 return useTopicSubscriptionInflightStats; 785 } 786 787 public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) { 788 this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats; 789 } 790 791 private static class DispatchedNode { 792 private final int size; 793 private final MessageId messageId; 794 private final Destination destination; 795 796 public DispatchedNode(final MessageReference node) { 797 super(); 798 this.size = node.getSize(); 799 this.messageId = node.getMessageId(); 800 this.destination = node.getRegionDestination() instanceof Destination ? 801 ((Destination)node.getRegionDestination()) : null; 802 } 803 804 public long getSize() { 805 return size; 806 } 807 808 public MessageId getMessageId() { 809 return messageId; 810 } 811 812 public Destination getDestination() { 813 return destination; 814 } 815 } 816 817}