001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.LinkedList;
021    import java.util.concurrent.atomic.AtomicLong;
022    
023    import javax.jms.JMSException;
024    
025    import org.apache.activemq.ActiveMQMessageAudit;
026    import org.apache.activemq.broker.Broker;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
029    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
030    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
031    import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
032    import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
033    import org.apache.activemq.command.ConsumerControl;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.Message;
036    import org.apache.activemq.command.MessageAck;
037    import org.apache.activemq.command.MessageDispatch;
038    import org.apache.activemq.command.MessageDispatchNotification;
039    import org.apache.activemq.command.MessagePull;
040    import org.apache.activemq.command.Response;
041    import org.apache.activemq.transaction.Synchronization;
042    import org.apache.activemq.usage.SystemUsage;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    public class TopicSubscription extends AbstractSubscription {
047    
048        private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
049        private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
050    
051        protected PendingMessageCursor matched;
052        protected final SystemUsage usageManager;
053        protected AtomicLong dispatchedCounter = new AtomicLong();
054    
055        boolean singleDestination = true;
056        Destination destination;
057    
058        private int maximumPendingMessages = -1;
059        private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
060        private int discarded;
061        private final Object matchedListMutex = new Object();
062        private final AtomicLong enqueueCounter = new AtomicLong(0);
063        private final AtomicLong dequeueCounter = new AtomicLong(0);
064        private int memoryUsageHighWaterMark = 95;
065        // allow duplicate suppression in a ring network of brokers
066        protected int maxProducersToAudit = 1024;
067        protected int maxAuditDepth = 1000;
068        protected boolean enableAudit = false;
069        protected ActiveMQMessageAudit audit;
070        protected boolean active = false;
071    
072        public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
073            super(broker, context, info);
074            this.usageManager = usageManager;
075            String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
076            if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
077                this.matched = new VMPendingMessageCursor(false);
078            } else {
079                this.matched = new FilePendingMessageCursor(broker,matchedName,false);
080            }
081        }
082    
083        public void init() throws Exception {
084            this.matched.setSystemUsage(usageManager);
085            this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
086            this.matched.start();
087            if (enableAudit) {
088                audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
089            }
090            this.active=true;
091        }
092    
093        public void add(MessageReference node) throws Exception {
094            if (isDuplicate(node)) {
095                return;
096            }
097            enqueueCounter.incrementAndGet();
098            if (!isFull() && matched.isEmpty()  && !isSlave()) {
099                // if maximumPendingMessages is set we will only discard messages which
100                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
101                dispatch(node);
102                setSlowConsumer(false);
103            } else {
104                if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
105                    // Slow consumers should log and set their state as such.
106                    if (!isSlowConsumer()) {
107                        LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow");
108                        setSlowConsumer(true);
109                        for (Destination dest: destinations) {
110                            dest.slowConsumer(getContext(), this);
111                        }
112                    }
113                }
114                if (maximumPendingMessages != 0) {
115                    boolean warnedAboutWait = false;
116                    while (active) {
117                        synchronized (matchedListMutex) {
118                            while (matched.isFull()) {
119                                if (getContext().getStopping().get()) {
120                                    LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
121                                            + node.getMessageId());
122                                    enqueueCounter.decrementAndGet();
123                                    return;
124                                }
125                                if (!warnedAboutWait) {
126                                    LOG.info(toString() + ": Pending message cursor [" + matched
127                                            + "] is full, temp usage ("
128                                            + +matched.getSystemUsage().getTempUsage().getPercentUsage()
129                                            + "%) or memory usage ("
130                                            + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
131                                            + "%) limit reached, blocking message add() pending the release of resources.");
132                                    warnedAboutWait = true;
133                                }
134                                matchedListMutex.wait(20);
135                            }
136                            // Temporary storage could be full - so just try to add the message
137                            // see https://issues.apache.org/activemq/browse/AMQ-2475
138                            if (matched.tryAddMessageLast(node, 10)) {
139                                break;
140                            }
141                        }
142                    }
143                    synchronized (matchedListMutex) {
144                        // NOTE - be careful about the slaveBroker!
145                        if (maximumPendingMessages > 0) {
146                            // calculate the high water mark from which point we
147                            // will eagerly evict expired messages
148                            int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
149                            if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
150                                max = maximumPendingMessages;
151                            }
152                            if (!matched.isEmpty() && matched.size() > max) {
153                                removeExpiredMessages();
154                            }
155                            // lets discard old messages as we are a slow consumer
156                            while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
157                                int pageInSize = matched.size() - maximumPendingMessages;
158                                // only page in a 1000 at a time - else we could blow the memory
159                                pageInSize = Math.max(1000, pageInSize);
160                                LinkedList<MessageReference> list = null;
161                                MessageReference[] oldMessages=null;
162                                synchronized(matched){
163                                    list = matched.pageInList(pageInSize);
164                                    oldMessages = messageEvictionStrategy.evictMessages(list);
165                                    for (MessageReference ref : list) {
166                                        ref.decrementReferenceCount();
167                                    }
168                                }
169                                int messagesToEvict = 0;
170                                if (oldMessages != null){
171                                    messagesToEvict = oldMessages.length;
172                                    for (int i = 0; i < messagesToEvict; i++) {
173                                        MessageReference oldMessage = oldMessages[i];
174                                        discard(oldMessage);
175                                    }
176                                }
177                                // lets avoid an infinite loop if we are given a bad eviction strategy
178                                // for a bad strategy lets just not evict
179                                if (messagesToEvict == 0) {
180                                    LOG.warn("No messages to evict returned for "  + destination + " from eviction strategy: " + messageEvictionStrategy + " out of " + list.size() + " candidates");
181                                    break;
182                                }
183                            }
184                        }
185                    }
186                    dispatchMatched();
187                }
188            }
189        }
190    
191        private boolean isDuplicate(MessageReference node) {
192            boolean duplicate = false;
193            if (enableAudit && audit != null) {
194                duplicate = audit.isDuplicate(node);
195                if (LOG.isDebugEnabled()) {
196                    if (duplicate) {
197                        LOG.debug(this + ", ignoring duplicate add: " + node.getMessageId());
198                    }
199                }
200            }
201            return duplicate;
202        }
203    
204        /**
205         * Discard any expired messages from the matched list. Called from a
206         * synchronized block.
207         *
208         * @throws IOException
209         */
210        protected void removeExpiredMessages() throws IOException {
211            try {
212                matched.reset();
213                while (matched.hasNext()) {
214                    MessageReference node = matched.next();
215                    node.decrementReferenceCount();
216                    if (broker.isExpired(node)) {
217                        matched.remove();
218                        dispatchedCounter.incrementAndGet();
219                        node.decrementReferenceCount();
220                        node.getRegionDestination().getDestinationStatistics().getExpired().increment();
221                        broker.messageExpired(getContext(), node, this);
222                        break;
223                    }
224                }
225            } finally {
226                matched.release();
227            }
228        }
229    
230        public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
231            synchronized (matchedListMutex) {
232                try {
233                    matched.reset();
234                    while (matched.hasNext()) {
235                        MessageReference node = matched.next();
236                        node.decrementReferenceCount();
237                        if (node.getMessageId().equals(mdn.getMessageId())) {
238                            matched.remove();
239                            dispatchedCounter.incrementAndGet();
240                            node.decrementReferenceCount();
241                            break;
242                        }
243                    }
244                } finally {
245                    matched.release();
246                }
247            }
248        }
249    
250        public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
251            // Handle the standard acknowledgment case.
252            if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
253                if (context.isInTransaction()) {
254                    context.getTransaction().addSynchronization(new Synchronization() {
255    
256                        @Override
257                        public void afterCommit() throws Exception {
258                           synchronized (TopicSubscription.this) {
259                                if (singleDestination && destination != null) {
260                                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
261                                }
262                            }
263                            dequeueCounter.addAndGet(ack.getMessageCount());
264                            dispatchMatched();
265                        }
266                    });
267                } else {
268                    if (singleDestination && destination != null) {
269                        destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
270                        destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
271                    }
272                    dequeueCounter.addAndGet(ack.getMessageCount());
273                }
274                dispatchMatched();
275                return;
276            } else if (ack.isDeliveredAck()) {
277                // Message was delivered but not acknowledged: update pre-fetch counters.
278                // also. get these for a consumer expired message.
279                if (destination != null && !ack.isInTransaction()) {
280                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
281                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
282                }
283                dequeueCounter.addAndGet(ack.getMessageCount());
284                dispatchMatched();
285                return;
286            } else if (ack.isRedeliveredAck()) {
287                // nothing to do atm
288                return;
289            }
290            throw new JMSException("Invalid acknowledgment: " + ack);
291        }
292    
293        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
294            // not supported for topics
295            return null;
296        }
297    
298        public int getPendingQueueSize() {
299            return matched();
300        }
301    
302        public int getDispatchedQueueSize() {
303            return (int)(dispatchedCounter.get() - dequeueCounter.get());
304        }
305    
306        public int getMaximumPendingMessages() {
307            return maximumPendingMessages;
308        }
309    
310        public long getDispatchedCounter() {
311            return dispatchedCounter.get();
312        }
313    
314        public long getEnqueueCounter() {
315            return enqueueCounter.get();
316        }
317    
318        public long getDequeueCounter() {
319            return dequeueCounter.get();
320        }
321    
322        /**
323         * @return the number of messages discarded due to being a slow consumer
324         */
325        public int discarded() {
326            synchronized (matchedListMutex) {
327                return discarded;
328            }
329        }
330    
331        /**
332         * @return the number of matched messages (messages targeted for the
333         *         subscription but not yet able to be dispatched due to the
334         *         prefetch buffer being full).
335         */
336        public int matched() {
337            synchronized (matchedListMutex) {
338                return matched.size();
339            }
340        }
341    
342        /**
343         * Sets the maximum number of pending messages that can be matched against
344         * this consumer before old messages are discarded.
345         */
346        public void setMaximumPendingMessages(int maximumPendingMessages) {
347            this.maximumPendingMessages = maximumPendingMessages;
348        }
349    
350        public MessageEvictionStrategy getMessageEvictionStrategy() {
351            return messageEvictionStrategy;
352        }
353    
354        /**
355         * Sets the eviction strategy used to decide which message to evict when the
356         * slow consumer needs to discard messages
357         */
358        public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
359            this.messageEvictionStrategy = messageEvictionStrategy;
360        }
361    
362        public int getMaxProducersToAudit() {
363            return maxProducersToAudit;
364        }
365    
366        public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
367            this.maxProducersToAudit = maxProducersToAudit;
368            if (audit != null) {
369                audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
370            }
371        }
372    
373        public int getMaxAuditDepth() {
374            return maxAuditDepth;
375        }
376    
377        public synchronized void setMaxAuditDepth(int maxAuditDepth) {
378            this.maxAuditDepth = maxAuditDepth;
379            if (audit != null) {
380                audit.setAuditDepth(maxAuditDepth);
381            }
382        }
383    
384        public boolean isEnableAudit() {
385            return enableAudit;
386        }
387    
388        public synchronized void setEnableAudit(boolean enableAudit) {
389            this.enableAudit = enableAudit;
390            if (enableAudit && audit == null) {
391                audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
392            }
393        }
394    
395        // Implementation methods
396        // -------------------------------------------------------------------------
397        public boolean isFull() {
398            return getDispatchedQueueSize() >= info.getPrefetchSize();
399        }
400    
401        public int getInFlightSize() {
402            return getDispatchedQueueSize();
403        }
404    
405        /**
406         * @return true when 60% or more room is left for dispatching messages
407         */
408        public boolean isLowWaterMark() {
409            return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
410        }
411    
412        /**
413         * @return true when 10% or less room is left for dispatching messages
414         */
415        public boolean isHighWaterMark() {
416            return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
417        }
418    
419        /**
420         * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
421         */
422        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
423            this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
424        }
425    
426        /**
427         * @return the memoryUsageHighWaterMark
428         */
429        public int getMemoryUsageHighWaterMark() {
430            return this.memoryUsageHighWaterMark;
431        }
432    
433        /**
434         * @return the usageManager
435         */
436        public SystemUsage getUsageManager() {
437            return this.usageManager;
438        }
439    
440        /**
441         * @return the matched
442         */
443        public PendingMessageCursor getMatched() {
444            return this.matched;
445        }
446    
447        /**
448         * @param matched the matched to set
449         */
450        public void setMatched(PendingMessageCursor matched) {
451            this.matched = matched;
452        }
453    
454        /**
455         * inform the MessageConsumer on the client to change it's prefetch
456         *
457         * @param newPrefetch
458         */
459        public void updateConsumerPrefetch(int newPrefetch) {
460            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
461                ConsumerControl cc = new ConsumerControl();
462                cc.setConsumerId(info.getConsumerId());
463                cc.setPrefetch(newPrefetch);
464                context.getConnection().dispatchAsync(cc);
465            }
466        }
467    
468        private void dispatchMatched() throws IOException {
469            synchronized (matchedListMutex) {
470                if (!matched.isEmpty() && !isFull()) {
471                    try {
472                        matched.reset();
473    
474                        while (matched.hasNext() && !isFull()) {
475                            MessageReference message = matched.next();
476                            message.decrementReferenceCount();
477                            matched.remove();
478                            // Message may have been sitting in the matched list a while
479                            // waiting for the consumer to ak the message.
480                            if (message.isExpired()) {
481                                discard(message);
482                                continue; // just drop it.
483                            }
484                            dispatch(message);
485                        }
486                    } finally {
487                        matched.release();
488                    }
489                }
490            }
491        }
492    
493        private void dispatch(final MessageReference node) throws IOException {
494            Message message = (Message)node;
495            node.incrementReferenceCount();
496            // Make sure we can dispatch a message.
497            MessageDispatch md = new MessageDispatch();
498            md.setMessage(message);
499            md.setConsumerId(info.getConsumerId());
500            md.setDestination(node.getRegionDestination().getActiveMQDestination());
501            dispatchedCounter.incrementAndGet();
502            // Keep track if this subscription is receiving messages from a single destination.
503            if (singleDestination) {
504                if (destination == null) {
505                    destination = node.getRegionDestination();
506                } else {
507                    if (destination != node.getRegionDestination()) {
508                        singleDestination = false;
509                    }
510                }
511            }
512            if (info.isDispatchAsync()) {
513                md.setTransmitCallback(new Runnable() {
514    
515                    public void run() {
516                        node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
517                        node.getRegionDestination().getDestinationStatistics().getInflight().increment();
518                        node.decrementReferenceCount();
519                    }
520                });
521                context.getConnection().dispatchAsync(md);
522            } else {
523                context.getConnection().dispatchSync(md);
524                node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
525                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
526                node.decrementReferenceCount();
527            }
528        }
529    
530        private void discard(MessageReference message) {
531            message.decrementReferenceCount();
532            matched.remove(message);
533            discarded++;
534            if(destination != null) {
535                destination.getDestinationStatistics().getDequeues().increment();
536            }
537            if (LOG.isDebugEnabled()) {
538                LOG.debug(this + ", discarding message " + message);
539            }
540            Destination dest = message.getRegionDestination();
541            if (dest != null) {
542                dest.messageDiscarded(getContext(), this, message);
543            }
544            broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
545        }
546    
547        @Override
548        public String toString() {
549            return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
550                   + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
551        }
552    
553        public void destroy() {
554            this.active=false;
555            synchronized (matchedListMutex) {
556                try {
557                    matched.destroy();
558                } catch (Exception e) {
559                    LOG.warn("Failed to destroy cursor", e);
560                }
561            }
562            setSlowConsumer(false);
563        }
564    
565        @Override
566        public int getPrefetchSize() {
567            return info.getPrefetchSize();
568        }
569    
570        @Override
571        public void setPrefetchSize(int newSize) {
572            info.setPrefetchSize(newSize);
573            try {
574                dispatchMatched();
575            } catch(Exception e) {
576                LOG.trace("Caught exception on dispatch after prefetch size change.");
577            }
578        }
579    }