001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.LinkedList;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    import java.util.concurrent.atomic.AtomicLong;
023    
024    import javax.jms.JMSException;
025    
026    import org.apache.activemq.ActiveMQMessageAudit;
027    import org.apache.activemq.broker.Broker;
028    import org.apache.activemq.broker.ConnectionContext;
029    import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
030    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
032    import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
033    import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
034    import org.apache.activemq.command.ConsumerControl;
035    import org.apache.activemq.command.ConsumerInfo;
036    import org.apache.activemq.command.Message;
037    import org.apache.activemq.command.MessageAck;
038    import org.apache.activemq.command.MessageDispatch;
039    import org.apache.activemq.command.MessageDispatchNotification;
040    import org.apache.activemq.command.MessagePull;
041    import org.apache.activemq.command.Response;
042    import org.apache.activemq.thread.Scheduler;
043    import org.apache.activemq.transaction.Synchronization;
044    import org.apache.activemq.transport.TransmitCallback;
045    import org.apache.activemq.usage.SystemUsage;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    public class TopicSubscription extends AbstractSubscription {
050    
051        private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
052        private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
053    
054        protected PendingMessageCursor matched;
055        protected final SystemUsage usageManager;
056        protected AtomicLong dispatchedCounter = new AtomicLong();
057    
058        boolean singleDestination = true;
059        Destination destination;
060        private final Scheduler scheduler;
061    
062        private int maximumPendingMessages = -1;
063        private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
064        private int discarded;
065        private final Object matchedListMutex = new Object();
066        private final AtomicLong enqueueCounter = new AtomicLong(0);
067        private final AtomicLong dequeueCounter = new AtomicLong(0);
068        private final AtomicBoolean prefetchWindowOpen = new AtomicBoolean(false);
069        private int memoryUsageHighWaterMark = 95;
070        // allow duplicate suppression in a ring network of brokers
071        protected int maxProducersToAudit = 1024;
072        protected int maxAuditDepth = 1000;
073        protected boolean enableAudit = false;
074        protected ActiveMQMessageAudit audit;
075        protected boolean active = false;
076    
077        public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
078            super(broker, context, info);
079            this.usageManager = usageManager;
080            String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
081            if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) {
082                this.matched = new VMPendingMessageCursor(false);
083            } else {
084                this.matched = new FilePendingMessageCursor(broker,matchedName,false);
085            }
086    
087            this.scheduler = broker.getScheduler();
088        }
089    
090        public void init() throws Exception {
091            this.matched.setSystemUsage(usageManager);
092            this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
093            this.matched.start();
094            if (enableAudit) {
095                audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
096            }
097            this.active=true;
098        }
099    
100        @Override
101        public void add(MessageReference node) throws Exception {
102            if (isDuplicate(node)) {
103                return;
104            }
105            // Lets use an indirect reference so that we can associate a unique
106            // locator /w the message.
107            node = new IndirectMessageReference(node.getMessage());
108            enqueueCounter.incrementAndGet();
109            if (!isFull() && matched.isEmpty()) {
110                // if maximumPendingMessages is set we will only discard messages which
111                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
112                dispatch(node);
113                setSlowConsumer(false);
114            } else {
115                if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
116                    // Slow consumers should log and set their state as such.
117                    if (!isSlowConsumer()) {
118                        LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow");
119                        setSlowConsumer(true);
120                        for (Destination dest: destinations) {
121                            dest.slowConsumer(getContext(), this);
122                        }
123                    }
124                }
125                if (maximumPendingMessages != 0) {
126                    boolean warnedAboutWait = false;
127                    while (active) {
128                        synchronized (matchedListMutex) {
129                            while (matched.isFull()) {
130                                if (getContext().getStopping().get()) {
131                                    LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
132                                            + node.getMessageId());
133                                    enqueueCounter.decrementAndGet();
134                                    return;
135                                }
136                                if (!warnedAboutWait) {
137                                    LOG.info(toString() + ": Pending message cursor [" + matched
138                                            + "] is full, temp usage ("
139                                            + +matched.getSystemUsage().getTempUsage().getPercentUsage()
140                                            + "%) or memory usage ("
141                                            + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
142                                            + "%) limit reached, blocking message add() pending the release of resources.");
143                                    warnedAboutWait = true;
144                                }
145                                matchedListMutex.wait(20);
146                            }
147                            // Temporary storage could be full - so just try to add the message
148                            // see https://issues.apache.org/activemq/browse/AMQ-2475
149                            if (matched.tryAddMessageLast(node, 10)) {
150                                break;
151                            }
152                        }
153                    }
154                    synchronized (matchedListMutex) {
155                        // NOTE - be careful about the slaveBroker!
156                        if (maximumPendingMessages > 0) {
157                            // calculate the high water mark from which point we
158                            // will eagerly evict expired messages
159                            int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
160                            if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
161                                max = maximumPendingMessages;
162                            }
163                            if (!matched.isEmpty() && matched.size() > max) {
164                                removeExpiredMessages();
165                            }
166                            // lets discard old messages as we are a slow consumer
167                            while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
168                                int pageInSize = matched.size() - maximumPendingMessages;
169                                // only page in a 1000 at a time - else we could blow the memory
170                                pageInSize = Math.max(1000, pageInSize);
171                                LinkedList<MessageReference> list = null;
172                                MessageReference[] oldMessages=null;
173                                synchronized(matched){
174                                    list = matched.pageInList(pageInSize);
175                                    oldMessages = messageEvictionStrategy.evictMessages(list);
176                                    for (MessageReference ref : list) {
177                                        ref.decrementReferenceCount();
178                                    }
179                                }
180                                int messagesToEvict = 0;
181                                if (oldMessages != null){
182                                    messagesToEvict = oldMessages.length;
183                                    for (int i = 0; i < messagesToEvict; i++) {
184                                        MessageReference oldMessage = oldMessages[i];
185                                        discard(oldMessage);
186                                    }
187                                }
188                                // lets avoid an infinite loop if we are given a bad eviction strategy
189                                // for a bad strategy lets just not evict
190                                if (messagesToEvict == 0) {
191                                    LOG.warn("No messages to evict returned for "  + destination + " from eviction strategy: " + messageEvictionStrategy + " out of " + list.size() + " candidates");
192                                    break;
193                                }
194                            }
195                        }
196                    }
197                    dispatchMatched();
198                }
199            }
200        }
201    
202        private boolean isDuplicate(MessageReference node) {
203            boolean duplicate = false;
204            if (enableAudit && audit != null) {
205                duplicate = audit.isDuplicate(node);
206                if (LOG.isDebugEnabled()) {
207                    if (duplicate) {
208                        LOG.debug(this + ", ignoring duplicate add: " + node.getMessageId());
209                    }
210                }
211            }
212            return duplicate;
213        }
214    
215        /**
216         * Discard any expired messages from the matched list. Called from a
217         * synchronized block.
218         *
219         * @throws IOException
220         */
221        protected void removeExpiredMessages() throws IOException {
222            try {
223                matched.reset();
224                while (matched.hasNext()) {
225                    MessageReference node = matched.next();
226                    node.decrementReferenceCount();
227                    if (broker.isExpired(node)) {
228                        matched.remove();
229                        dispatchedCounter.incrementAndGet();
230                        node.decrementReferenceCount();
231                        ((Destination)node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
232                        broker.messageExpired(getContext(), node, this);
233                        break;
234                    }
235                }
236            } finally {
237                matched.release();
238            }
239        }
240    
241        @Override
242        public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
243            synchronized (matchedListMutex) {
244                try {
245                    matched.reset();
246                    while (matched.hasNext()) {
247                        MessageReference node = matched.next();
248                        node.decrementReferenceCount();
249                        if (node.getMessageId().equals(mdn.getMessageId())) {
250                            matched.remove();
251                            dispatchedCounter.incrementAndGet();
252                            node.decrementReferenceCount();
253                            break;
254                        }
255                    }
256                } finally {
257                    matched.release();
258                }
259            }
260        }
261    
262        @Override
263        public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
264            // Handle the standard acknowledgment case.
265            if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
266                if (context.isInTransaction()) {
267                    context.getTransaction().addSynchronization(new Synchronization() {
268    
269                        @Override
270                        public void afterCommit() throws Exception {
271                           synchronized (TopicSubscription.this) {
272                                if (singleDestination && destination != null) {
273                                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
274                                }
275                            }
276                            dequeueCounter.addAndGet(ack.getMessageCount());
277                            dispatchMatched();
278                        }
279                    });
280                } else {
281                    if (singleDestination && destination != null) {
282                        destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
283                        destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
284                    }
285                    dequeueCounter.addAndGet(ack.getMessageCount());
286                }
287                dispatchMatched();
288                return;
289            } else if (ack.isDeliveredAck()) {
290                // Message was delivered but not acknowledged: update pre-fetch counters.
291                // also. get these for a consumer expired message.
292                if (destination != null && !ack.isInTransaction()) {
293                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
294                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
295                }
296                dequeueCounter.addAndGet(ack.getMessageCount());
297                dispatchMatched();
298                return;
299            } else if (ack.isRedeliveredAck()) {
300                // nothing to do atm
301                return;
302            }
303            throw new JMSException("Invalid acknowledgment: " + ack);
304        }
305    
306        @Override
307        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
308    
309            // The slave should not deliver pull messages.
310            if (getPrefetchSize() == 0 ) {
311    
312                prefetchWindowOpen.set(true);
313                dispatchMatched();
314    
315                // If there was nothing dispatched.. we may need to setup a timeout.
316                if (prefetchWindowOpen.get()) {
317    
318                    // immediate timeout used by receiveNoWait()
319                    if (pull.getTimeout() == -1) {
320                        prefetchWindowOpen.set(false);
321                        // Send a NULL message to signal nothing pending.
322                        dispatch(null);
323                    }
324    
325                    if (pull.getTimeout() > 0) {
326                        scheduler.executeAfterDelay(new Runnable() {
327    
328                            @Override
329                            public void run() {
330                                pullTimeout();
331                            }
332                        }, pull.getTimeout());
333                    }
334                }
335            }
336            return null;
337        }
338    
339        /**
340         * Occurs when a pull times out. If nothing has been dispatched since the
341         * timeout was setup, then send the NULL message.
342         */
343        private final void pullTimeout() {
344            synchronized (matchedListMutex) {
345                if (prefetchWindowOpen.compareAndSet(true, false)) {
346                    try {
347                        dispatch(null);
348                    } catch (Exception e) {
349                        context.getConnection().serviceException(e);
350                    }
351                }
352            }
353        }
354    
355        @Override
356        public int getPendingQueueSize() {
357            return matched();
358        }
359    
360        @Override
361        public int getDispatchedQueueSize() {
362            return (int)(dispatchedCounter.get() - dequeueCounter.get());
363        }
364    
365        public int getMaximumPendingMessages() {
366            return maximumPendingMessages;
367        }
368    
369        @Override
370        public long getDispatchedCounter() {
371            return dispatchedCounter.get();
372        }
373    
374        @Override
375        public long getEnqueueCounter() {
376            return enqueueCounter.get();
377        }
378    
379        @Override
380        public long getDequeueCounter() {
381            return dequeueCounter.get();
382        }
383    
384        /**
385         * @return the number of messages discarded due to being a slow consumer
386         */
387        public int discarded() {
388            synchronized (matchedListMutex) {
389                return discarded;
390            }
391        }
392    
393        /**
394         * @return the number of matched messages (messages targeted for the
395         *         subscription but not yet able to be dispatched due to the
396         *         prefetch buffer being full).
397         */
398        public int matched() {
399            synchronized (matchedListMutex) {
400                return matched.size();
401            }
402        }
403    
404        /**
405         * Sets the maximum number of pending messages that can be matched against
406         * this consumer before old messages are discarded.
407         */
408        public void setMaximumPendingMessages(int maximumPendingMessages) {
409            this.maximumPendingMessages = maximumPendingMessages;
410        }
411    
412        public MessageEvictionStrategy getMessageEvictionStrategy() {
413            return messageEvictionStrategy;
414        }
415    
416        /**
417         * Sets the eviction strategy used to decide which message to evict when the
418         * slow consumer needs to discard messages
419         */
420        public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
421            this.messageEvictionStrategy = messageEvictionStrategy;
422        }
423    
424        public int getMaxProducersToAudit() {
425            return maxProducersToAudit;
426        }
427    
428        public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
429            this.maxProducersToAudit = maxProducersToAudit;
430            if (audit != null) {
431                audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
432            }
433        }
434    
435        public int getMaxAuditDepth() {
436            return maxAuditDepth;
437        }
438    
439        public synchronized void setMaxAuditDepth(int maxAuditDepth) {
440            this.maxAuditDepth = maxAuditDepth;
441            if (audit != null) {
442                audit.setAuditDepth(maxAuditDepth);
443            }
444        }
445    
446        public boolean isEnableAudit() {
447            return enableAudit;
448        }
449    
450        public synchronized void setEnableAudit(boolean enableAudit) {
451            this.enableAudit = enableAudit;
452            if (enableAudit && audit == null) {
453                audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
454            }
455        }
456    
457        // Implementation methods
458        // -------------------------------------------------------------------------
459        @Override
460        public boolean isFull() {
461            return getDispatchedQueueSize() >= info.getPrefetchSize() && !prefetchWindowOpen.get();
462        }
463    
464        @Override
465        public int getInFlightSize() {
466            return getDispatchedQueueSize();
467        }
468    
469        /**
470         * @return true when 60% or more room is left for dispatching messages
471         */
472        @Override
473        public boolean isLowWaterMark() {
474            return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
475        }
476    
477        /**
478         * @return true when 10% or less room is left for dispatching messages
479         */
480        @Override
481        public boolean isHighWaterMark() {
482            return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
483        }
484    
485        /**
486         * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
487         */
488        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
489            this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
490        }
491    
492        /**
493         * @return the memoryUsageHighWaterMark
494         */
495        public int getMemoryUsageHighWaterMark() {
496            return this.memoryUsageHighWaterMark;
497        }
498    
499        /**
500         * @return the usageManager
501         */
502        public SystemUsage getUsageManager() {
503            return this.usageManager;
504        }
505    
506        /**
507         * @return the matched
508         */
509        public PendingMessageCursor getMatched() {
510            return this.matched;
511        }
512    
513        /**
514         * @param matched the matched to set
515         */
516        public void setMatched(PendingMessageCursor matched) {
517            this.matched = matched;
518        }
519    
520        /**
521         * inform the MessageConsumer on the client to change it's prefetch
522         *
523         * @param newPrefetch
524         */
525        @Override
526        public void updateConsumerPrefetch(int newPrefetch) {
527            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
528                ConsumerControl cc = new ConsumerControl();
529                cc.setConsumerId(info.getConsumerId());
530                cc.setPrefetch(newPrefetch);
531                context.getConnection().dispatchAsync(cc);
532            }
533        }
534    
535        private void dispatchMatched() throws IOException {
536            synchronized (matchedListMutex) {
537                if (!matched.isEmpty() && !isFull()) {
538                    try {
539                        matched.reset();
540    
541                        while (matched.hasNext() && !isFull()) {
542                            MessageReference message = matched.next();
543                            message.decrementReferenceCount();
544                            matched.remove();
545                            // Message may have been sitting in the matched list a while
546                            // waiting for the consumer to ak the message.
547                            if (message.isExpired()) {
548                                discard(message);
549                                continue; // just drop it.
550                            }
551                            dispatch(message);
552                            prefetchWindowOpen.set(false);
553                        }
554                    } finally {
555                        matched.release();
556                    }
557                }
558            }
559        }
560    
561        private void dispatch(final MessageReference node) throws IOException {
562            Message message = node.getMessage();
563            if (node != null) {
564                node.incrementReferenceCount();
565            }
566            // Make sure we can dispatch a message.
567            MessageDispatch md = new MessageDispatch();
568            md.setMessage(message);
569            md.setConsumerId(info.getConsumerId());
570            if (node != null) {
571                md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
572                dispatchedCounter.incrementAndGet();
573                // Keep track if this subscription is receiving messages from a single destination.
574                if (singleDestination) {
575                    if (destination == null) {
576                        destination = (Destination)node.getRegionDestination();
577                    } else {
578                        if (destination != node.getRegionDestination()) {
579                            singleDestination = false;
580                        }
581                    }
582                }
583            }
584            if (info.isDispatchAsync()) {
585                if (node != null) {
586                    md.setTransmitCallback(new TransmitCallback() {
587    
588                        @Override
589                        public void onSuccess() {
590                            Destination regionDestination = (Destination) node.getRegionDestination();
591                            regionDestination.getDestinationStatistics().getDispatched().increment();
592                            regionDestination.getDestinationStatistics().getInflight().increment();
593                            node.decrementReferenceCount();
594                        }
595    
596                        @Override
597                        public void onFailure() {
598                            Destination regionDestination = (Destination) node.getRegionDestination();
599                            regionDestination.getDestinationStatistics().getDispatched().increment();
600                            regionDestination.getDestinationStatistics().getInflight().increment();
601                            node.decrementReferenceCount();
602                        }
603                    });
604                }
605                context.getConnection().dispatchAsync(md);
606            } else {
607                context.getConnection().dispatchSync(md);
608                if (node != null) {
609                    Destination regionDestination = (Destination) node.getRegionDestination();
610                    regionDestination.getDestinationStatistics().getDispatched().increment();
611                    regionDestination.getDestinationStatistics().getInflight().increment();
612                    node.decrementReferenceCount();
613                }
614            }
615        }
616    
617        private void discard(MessageReference message) {
618            message.decrementReferenceCount();
619            matched.remove(message);
620            discarded++;
621            if(destination != null) {
622                destination.getDestinationStatistics().getDequeues().increment();
623            }
624            if (LOG.isDebugEnabled()) {
625                LOG.debug(this + ", discarding message " + message);
626            }
627            Destination dest = (Destination) message.getRegionDestination();
628            if (dest != null) {
629                dest.messageDiscarded(getContext(), this, message);
630            }
631            broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
632        }
633    
634        @Override
635        public String toString() {
636            return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
637                   + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
638        }
639    
640        @Override
641        public void destroy() {
642            this.active=false;
643            synchronized (matchedListMutex) {
644                try {
645                    matched.destroy();
646                } catch (Exception e) {
647                    LOG.warn("Failed to destroy cursor", e);
648                }
649            }
650            setSlowConsumer(false);
651        }
652    
653        @Override
654        public int getPrefetchSize() {
655            return info.getPrefetchSize();
656        }
657    
658        @Override
659        public void setPrefetchSize(int newSize) {
660            info.setPrefetchSize(newSize);
661            try {
662                dispatchMatched();
663            } catch(Exception e) {
664                LOG.trace("Caught exception on dispatch after prefetch size change.");
665            }
666        }
667    }