001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.LinkedList;
021 import java.util.concurrent.atomic.AtomicLong;
022
023 import javax.jms.JMSException;
024
025 import org.apache.activemq.ActiveMQMessageAudit;
026 import org.apache.activemq.broker.Broker;
027 import org.apache.activemq.broker.ConnectionContext;
028 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
029 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
030 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
031 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
032 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
033 import org.apache.activemq.command.ConsumerControl;
034 import org.apache.activemq.command.ConsumerInfo;
035 import org.apache.activemq.command.Message;
036 import org.apache.activemq.command.MessageAck;
037 import org.apache.activemq.command.MessageDispatch;
038 import org.apache.activemq.command.MessageDispatchNotification;
039 import org.apache.activemq.command.MessagePull;
040 import org.apache.activemq.command.Response;
041 import org.apache.activemq.transaction.Synchronization;
042 import org.apache.activemq.usage.SystemUsage;
043 import org.slf4j.Logger;
044 import org.slf4j.LoggerFactory;
045
046 public class TopicSubscription extends AbstractSubscription {
047
048 private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
049 private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
050
051 protected PendingMessageCursor matched;
052 protected final SystemUsage usageManager;
053 protected AtomicLong dispatchedCounter = new AtomicLong();
054
055 boolean singleDestination = true;
056 Destination destination;
057
058 private int maximumPendingMessages = -1;
059 private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
060 private int discarded;
061 private final Object matchedListMutex = new Object();
062 private final AtomicLong enqueueCounter = new AtomicLong(0);
063 private final AtomicLong dequeueCounter = new AtomicLong(0);
064 private int memoryUsageHighWaterMark = 95;
065 // allow duplicate suppression in a ring network of brokers
066 protected int maxProducersToAudit = 1024;
067 protected int maxAuditDepth = 1000;
068 protected boolean enableAudit = false;
069 protected ActiveMQMessageAudit audit;
070 protected boolean active = false;
071
072 public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
073 super(broker, context, info);
074 this.usageManager = usageManager;
075 String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
076 if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
077 this.matched = new VMPendingMessageCursor(false);
078 } else {
079 this.matched = new FilePendingMessageCursor(broker,matchedName,false);
080 }
081 }
082
083 public void init() throws Exception {
084 this.matched.setSystemUsage(usageManager);
085 this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
086 this.matched.start();
087 if (enableAudit) {
088 audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
089 }
090 this.active=true;
091 }
092
093 public void add(MessageReference node) throws Exception {
094 if (isDuplicate(node)) {
095 return;
096 }
097 enqueueCounter.incrementAndGet();
098 if (!isFull() && matched.isEmpty() && !isSlave()) {
099 // if maximumPendingMessages is set we will only discard messages which
100 // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
101 dispatch(node);
102 setSlowConsumer(false);
103 } else {
104 if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
105 // Slow consumers should log and set their state as such.
106 if (!isSlowConsumer()) {
107 LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow");
108 setSlowConsumer(true);
109 for (Destination dest: destinations) {
110 dest.slowConsumer(getContext(), this);
111 }
112 }
113 }
114 if (maximumPendingMessages != 0) {
115 boolean warnedAboutWait = false;
116 while (active) {
117 synchronized (matchedListMutex) {
118 while (matched.isFull()) {
119 if (getContext().getStopping().get()) {
120 LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
121 + node.getMessageId());
122 enqueueCounter.decrementAndGet();
123 return;
124 }
125 if (!warnedAboutWait) {
126 LOG.info(toString() + ": Pending message cursor [" + matched
127 + "] is full, temp usage ("
128 + +matched.getSystemUsage().getTempUsage().getPercentUsage()
129 + "%) or memory usage ("
130 + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
131 + "%) limit reached, blocking message add() pending the release of resources.");
132 warnedAboutWait = true;
133 }
134 matchedListMutex.wait(20);
135 }
136 // Temporary storage could be full - so just try to add the message
137 // see https://issues.apache.org/activemq/browse/AMQ-2475
138 if (matched.tryAddMessageLast(node, 10)) {
139 break;
140 }
141 }
142 }
143 synchronized (matchedListMutex) {
144 // NOTE - be careful about the slaveBroker!
145 if (maximumPendingMessages > 0) {
146 // calculate the high water mark from which point we
147 // will eagerly evict expired messages
148 int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
149 if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
150 max = maximumPendingMessages;
151 }
152 if (!matched.isEmpty() && matched.size() > max) {
153 removeExpiredMessages();
154 }
155 // lets discard old messages as we are a slow consumer
156 while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
157 int pageInSize = matched.size() - maximumPendingMessages;
158 // only page in a 1000 at a time - else we could blow the memory
159 pageInSize = Math.max(1000, pageInSize);
160 LinkedList<MessageReference> list = null;
161 MessageReference[] oldMessages=null;
162 synchronized(matched){
163 list = matched.pageInList(pageInSize);
164 oldMessages = messageEvictionStrategy.evictMessages(list);
165 for (MessageReference ref : list) {
166 ref.decrementReferenceCount();
167 }
168 }
169 int messagesToEvict = 0;
170 if (oldMessages != null){
171 messagesToEvict = oldMessages.length;
172 for (int i = 0; i < messagesToEvict; i++) {
173 MessageReference oldMessage = oldMessages[i];
174 discard(oldMessage);
175 }
176 }
177 // lets avoid an infinite loop if we are given a bad eviction strategy
178 // for a bad strategy lets just not evict
179 if (messagesToEvict == 0) {
180 LOG.warn("No messages to evict returned for " + destination + " from eviction strategy: " + messageEvictionStrategy + " out of " + list.size() + " candidates");
181 break;
182 }
183 }
184 }
185 }
186 dispatchMatched();
187 }
188 }
189 }
190
191 private boolean isDuplicate(MessageReference node) {
192 boolean duplicate = false;
193 if (enableAudit && audit != null) {
194 duplicate = audit.isDuplicate(node);
195 if (LOG.isDebugEnabled()) {
196 if (duplicate) {
197 LOG.debug(this + ", ignoring duplicate add: " + node.getMessageId());
198 }
199 }
200 }
201 return duplicate;
202 }
203
204 /**
205 * Discard any expired messages from the matched list. Called from a
206 * synchronized block.
207 *
208 * @throws IOException
209 */
210 protected void removeExpiredMessages() throws IOException {
211 try {
212 matched.reset();
213 while (matched.hasNext()) {
214 MessageReference node = matched.next();
215 node.decrementReferenceCount();
216 if (broker.isExpired(node)) {
217 matched.remove();
218 dispatchedCounter.incrementAndGet();
219 node.decrementReferenceCount();
220 node.getRegionDestination().getDestinationStatistics().getExpired().increment();
221 broker.messageExpired(getContext(), node, this);
222 break;
223 }
224 }
225 } finally {
226 matched.release();
227 }
228 }
229
230 public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
231 synchronized (matchedListMutex) {
232 try {
233 matched.reset();
234 while (matched.hasNext()) {
235 MessageReference node = matched.next();
236 node.decrementReferenceCount();
237 if (node.getMessageId().equals(mdn.getMessageId())) {
238 matched.remove();
239 dispatchedCounter.incrementAndGet();
240 node.decrementReferenceCount();
241 break;
242 }
243 }
244 } finally {
245 matched.release();
246 }
247 }
248 }
249
250 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
251 // Handle the standard acknowledgment case.
252 if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
253 if (context.isInTransaction()) {
254 context.getTransaction().addSynchronization(new Synchronization() {
255
256 @Override
257 public void afterCommit() throws Exception {
258 synchronized (TopicSubscription.this) {
259 if (singleDestination && destination != null) {
260 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
261 }
262 }
263 dequeueCounter.addAndGet(ack.getMessageCount());
264 dispatchMatched();
265 }
266 });
267 } else {
268 if (singleDestination && destination != null) {
269 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
270 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
271 }
272 dequeueCounter.addAndGet(ack.getMessageCount());
273 }
274 dispatchMatched();
275 return;
276 } else if (ack.isDeliveredAck()) {
277 // Message was delivered but not acknowledged: update pre-fetch counters.
278 // also. get these for a consumer expired message.
279 if (destination != null && !ack.isInTransaction()) {
280 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
281 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
282 }
283 dequeueCounter.addAndGet(ack.getMessageCount());
284 dispatchMatched();
285 return;
286 } else if (ack.isRedeliveredAck()) {
287 // nothing to do atm
288 return;
289 }
290 throw new JMSException("Invalid acknowledgment: " + ack);
291 }
292
293 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
294 // not supported for topics
295 return null;
296 }
297
298 public int getPendingQueueSize() {
299 return matched();
300 }
301
302 public int getDispatchedQueueSize() {
303 return (int)(dispatchedCounter.get() - dequeueCounter.get());
304 }
305
306 public int getMaximumPendingMessages() {
307 return maximumPendingMessages;
308 }
309
310 public long getDispatchedCounter() {
311 return dispatchedCounter.get();
312 }
313
314 public long getEnqueueCounter() {
315 return enqueueCounter.get();
316 }
317
318 public long getDequeueCounter() {
319 return dequeueCounter.get();
320 }
321
322 /**
323 * @return the number of messages discarded due to being a slow consumer
324 */
325 public int discarded() {
326 synchronized (matchedListMutex) {
327 return discarded;
328 }
329 }
330
331 /**
332 * @return the number of matched messages (messages targeted for the
333 * subscription but not yet able to be dispatched due to the
334 * prefetch buffer being full).
335 */
336 public int matched() {
337 synchronized (matchedListMutex) {
338 return matched.size();
339 }
340 }
341
342 /**
343 * Sets the maximum number of pending messages that can be matched against
344 * this consumer before old messages are discarded.
345 */
346 public void setMaximumPendingMessages(int maximumPendingMessages) {
347 this.maximumPendingMessages = maximumPendingMessages;
348 }
349
350 public MessageEvictionStrategy getMessageEvictionStrategy() {
351 return messageEvictionStrategy;
352 }
353
354 /**
355 * Sets the eviction strategy used to decide which message to evict when the
356 * slow consumer needs to discard messages
357 */
358 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
359 this.messageEvictionStrategy = messageEvictionStrategy;
360 }
361
362 public int getMaxProducersToAudit() {
363 return maxProducersToAudit;
364 }
365
366 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
367 this.maxProducersToAudit = maxProducersToAudit;
368 if (audit != null) {
369 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
370 }
371 }
372
373 public int getMaxAuditDepth() {
374 return maxAuditDepth;
375 }
376
377 public synchronized void setMaxAuditDepth(int maxAuditDepth) {
378 this.maxAuditDepth = maxAuditDepth;
379 if (audit != null) {
380 audit.setAuditDepth(maxAuditDepth);
381 }
382 }
383
384 public boolean isEnableAudit() {
385 return enableAudit;
386 }
387
388 public synchronized void setEnableAudit(boolean enableAudit) {
389 this.enableAudit = enableAudit;
390 if (enableAudit && audit == null) {
391 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
392 }
393 }
394
395 // Implementation methods
396 // -------------------------------------------------------------------------
397 public boolean isFull() {
398 return getDispatchedQueueSize() >= info.getPrefetchSize();
399 }
400
401 public int getInFlightSize() {
402 return getDispatchedQueueSize();
403 }
404
405 /**
406 * @return true when 60% or more room is left for dispatching messages
407 */
408 public boolean isLowWaterMark() {
409 return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
410 }
411
412 /**
413 * @return true when 10% or less room is left for dispatching messages
414 */
415 public boolean isHighWaterMark() {
416 return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
417 }
418
419 /**
420 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
421 */
422 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
423 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
424 }
425
426 /**
427 * @return the memoryUsageHighWaterMark
428 */
429 public int getMemoryUsageHighWaterMark() {
430 return this.memoryUsageHighWaterMark;
431 }
432
433 /**
434 * @return the usageManager
435 */
436 public SystemUsage getUsageManager() {
437 return this.usageManager;
438 }
439
440 /**
441 * @return the matched
442 */
443 public PendingMessageCursor getMatched() {
444 return this.matched;
445 }
446
447 /**
448 * @param matched the matched to set
449 */
450 public void setMatched(PendingMessageCursor matched) {
451 this.matched = matched;
452 }
453
454 /**
455 * inform the MessageConsumer on the client to change it's prefetch
456 *
457 * @param newPrefetch
458 */
459 public void updateConsumerPrefetch(int newPrefetch) {
460 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
461 ConsumerControl cc = new ConsumerControl();
462 cc.setConsumerId(info.getConsumerId());
463 cc.setPrefetch(newPrefetch);
464 context.getConnection().dispatchAsync(cc);
465 }
466 }
467
468 private void dispatchMatched() throws IOException {
469 synchronized (matchedListMutex) {
470 if (!matched.isEmpty() && !isFull()) {
471 try {
472 matched.reset();
473
474 while (matched.hasNext() && !isFull()) {
475 MessageReference message = matched.next();
476 message.decrementReferenceCount();
477 matched.remove();
478 // Message may have been sitting in the matched list a while
479 // waiting for the consumer to ak the message.
480 if (message.isExpired()) {
481 discard(message);
482 continue; // just drop it.
483 }
484 dispatch(message);
485 }
486 } finally {
487 matched.release();
488 }
489 }
490 }
491 }
492
493 private void dispatch(final MessageReference node) throws IOException {
494 Message message = (Message)node;
495 node.incrementReferenceCount();
496 // Make sure we can dispatch a message.
497 MessageDispatch md = new MessageDispatch();
498 md.setMessage(message);
499 md.setConsumerId(info.getConsumerId());
500 md.setDestination(node.getRegionDestination().getActiveMQDestination());
501 dispatchedCounter.incrementAndGet();
502 // Keep track if this subscription is receiving messages from a single destination.
503 if (singleDestination) {
504 if (destination == null) {
505 destination = node.getRegionDestination();
506 } else {
507 if (destination != node.getRegionDestination()) {
508 singleDestination = false;
509 }
510 }
511 }
512 if (info.isDispatchAsync()) {
513 md.setTransmitCallback(new Runnable() {
514
515 public void run() {
516 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
517 node.getRegionDestination().getDestinationStatistics().getInflight().increment();
518 node.decrementReferenceCount();
519 }
520 });
521 context.getConnection().dispatchAsync(md);
522 } else {
523 context.getConnection().dispatchSync(md);
524 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
525 node.getRegionDestination().getDestinationStatistics().getInflight().increment();
526 node.decrementReferenceCount();
527 }
528 }
529
530 private void discard(MessageReference message) {
531 message.decrementReferenceCount();
532 matched.remove(message);
533 discarded++;
534 if(destination != null) {
535 destination.getDestinationStatistics().getDequeues().increment();
536 }
537 if (LOG.isDebugEnabled()) {
538 LOG.debug(this + ", discarding message " + message);
539 }
540 Destination dest = message.getRegionDestination();
541 if (dest != null) {
542 dest.messageDiscarded(getContext(), this, message);
543 }
544 broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
545 }
546
547 @Override
548 public String toString() {
549 return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
550 + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
551 }
552
553 public void destroy() {
554 this.active=false;
555 synchronized (matchedListMutex) {
556 try {
557 matched.destroy();
558 } catch (Exception e) {
559 LOG.warn("Failed to destroy cursor", e);
560 }
561 }
562 setSlowConsumer(false);
563 }
564
565 @Override
566 public int getPrefetchSize() {
567 return info.getPrefetchSize();
568 }
569
570 @Override
571 public void setPrefetchSize(int newSize) {
572 info.setPrefetchSize(newSize);
573 try {
574 dispatchMatched();
575 } catch(Exception e) {
576 LOG.trace("Caught exception on dispatch after prefetch size change.");
577 }
578 }
579 }