001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.Collections;
021    import java.util.concurrent.ConcurrentHashMap;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    import java.util.concurrent.atomic.AtomicLong;
024    
025    import javax.jms.InvalidSelectorException;
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.broker.Broker;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031    import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
032    import org.apache.activemq.broker.region.policy.PolicyEntry;
033    import org.apache.activemq.command.ActiveMQDestination;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.Message;
036    import org.apache.activemq.command.MessageAck;
037    import org.apache.activemq.command.MessageDispatch;
038    import org.apache.activemq.command.MessageId;
039    import org.apache.activemq.store.TopicMessageStore;
040    import org.apache.activemq.usage.SystemUsage;
041    import org.apache.activemq.usage.Usage;
042    import org.apache.activemq.usage.UsageListener;
043    import org.apache.activemq.util.SubscriptionKey;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
048    
049        private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
050        private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
051        private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
052        private final SubscriptionKey subscriptionKey;
053        private final boolean keepDurableSubsActive;
054        private AtomicBoolean active = new AtomicBoolean();
055        private AtomicLong offlineTimestamp = new AtomicLong(-1);
056    
057        public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
058                throws JMSException {
059            super(broker, usageManager, context, info);
060            this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
061            this.pending.setSystemUsage(usageManager);
062            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
063            this.keepDurableSubsActive = keepDurableSubsActive;
064            subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
065        }
066    
067        public final boolean isActive() {
068            return active.get();
069        }
070    
071        public final long getOfflineTimestamp() {
072            return offlineTimestamp.get();
073        }
074    
075        public boolean isFull() {
076            return !active.get() || super.isFull();
077        }
078    
079        public void gc() {
080        }
081    
082        /**
083         * store will have a pending ack for all durables, irrespective of the
084         * selector so we need to ack if node is un-matched
085         */
086        public void unmatched(MessageReference node) throws IOException {
087            MessageAck ack = new MessageAck();
088            ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
089            ack.setMessageID(node.getMessageId());
090            node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
091        }
092    
093        @Override
094        protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
095            // statically configured via maxPageSize
096        }
097    
098        public void add(ConnectionContext context, Destination destination) throws Exception {
099            if (!destinations.contains(destination)) {
100                super.add(context, destination);
101            }
102            // do it just once per destination
103            if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
104                return;
105            }
106            durableDestinations.put(destination.getActiveMQDestination(), destination);
107    
108            if (active.get() || keepDurableSubsActive) {
109                Topic topic = (Topic) destination;
110                topic.activate(context, this);
111                if (pending.isEmpty(topic)) {
112                    topic.recoverRetroactiveMessages(context, this);
113                }
114                this.enqueueCounter += pending.size();
115            } else if (destination.getMessageStore() != null) {
116                TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
117                try {
118                    this.enqueueCounter += store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName());
119                } catch (IOException e) {
120                    JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e);
121                    jmsEx.setLinkedException(e);
122                    throw jmsEx;
123                }
124            }
125            dispatchPending();
126        }
127    
128        public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception {
129            if (!active.get()) {
130                this.context = context;
131                this.info = info;
132    
133                // On Activation we should update the configuration based on our new consumer info.
134                ActiveMQDestination dest = this.info.getDestination();
135                if (dest != null && regionBroker.getDestinationPolicy() != null) {
136                    PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
137                    if (entry != null) {
138                        entry.configure(broker, usageManager, this);
139                    }
140                }
141    
142                LOG.debug("Activating " + this);
143                if (!keepDurableSubsActive) {
144                    for (Destination destination : durableDestinations.values()) {
145                        Topic topic = (Topic) destination;
146                        add(context, topic);
147                        topic.activate(context, this);
148                    }
149                }
150                synchronized (pendingLock) {
151                    pending.setSystemUsage(memoryManager);
152                    pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
153                    pending.setMaxAuditDepth(getMaxAuditDepth());
154                    pending.setMaxProducersToAudit(getMaxProducersToAudit());
155                    pending.start();
156                    // If nothing was in the persistent store, then try to use the
157                    // recovery policy.
158                    if (pending.isEmpty()) {
159                        for (Destination destination : durableDestinations.values()) {
160                            Topic topic = (Topic) destination;
161                            topic.recoverRetroactiveMessages(context, this);
162                        }
163                    }
164                }
165                this.active.set(true);
166                this.offlineTimestamp.set(-1);
167                dispatchPending();
168                this.usageManager.getMemoryUsage().addUsageListener(this);
169            }
170        }
171    
172        public void deactivate(boolean keepDurableSubsActive) throws Exception {
173            LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this);
174            active.set(false);
175            offlineTimestamp.set(System.currentTimeMillis());
176            this.usageManager.getMemoryUsage().removeUsageListener(this);
177            synchronized (pendingLock) {
178                pending.stop();
179    
180                synchronized (dispatchLock) {
181                    for (Destination destination : durableDestinations.values()) {
182                        Topic topic = (Topic) destination;
183                        if (!keepDurableSubsActive) {
184                            topic.deactivate(context, this);
185                        } else {
186                            topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
187                        }
188                    }
189    
190                    // Before we add these back to pending they need to be in producer order not
191                    // dispatch order so we can add them to the front of the pending list.
192                    Collections.reverse(dispatched);
193    
194                    for (final MessageReference node : dispatched) {
195                        // Mark the dispatched messages as redelivered for next time.
196                        Integer count = redeliveredMessages.get(node.getMessageId());
197                        if (count != null) {
198                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
199                        } else {
200                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
201                        }
202                        if (keepDurableSubsActive && pending.isTransient()) {
203                            pending.addMessageFirst(node);
204                            pending.rollback(node.getMessageId());
205                        } else {
206                            node.decrementReferenceCount();
207                        }
208                    }
209    
210                    dispatched.clear();
211                }
212                if (!keepDurableSubsActive && pending.isTransient()) {
213                    try {
214                        pending.reset();
215                        while (pending.hasNext()) {
216                            MessageReference node = pending.next();
217                            node.decrementReferenceCount();
218                            pending.remove();
219                        }
220                    } finally {
221                        pending.release();
222                    }
223                }
224            }
225            prefetchExtension.set(0);
226        }
227    
228        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
229            MessageDispatch md = super.createMessageDispatch(node, message);
230            if (node != QueueMessageReference.NULL_MESSAGE) {
231                Integer count = redeliveredMessages.get(node.getMessageId());
232                if (count != null) {
233                    md.setRedeliveryCounter(count.intValue());
234                }
235            }
236            return md;
237        }
238    
239        public void add(MessageReference node) throws Exception {
240            if (!active.get() && !keepDurableSubsActive) {
241                return;
242            }
243            super.add(node);
244        }
245    
246        protected void dispatchPending() throws IOException {
247            if (isActive()) {
248                super.dispatchPending();
249            }
250        }
251    
252        public void removePending(MessageReference node) throws IOException {
253            pending.remove(node);
254        }
255    
256        protected void doAddRecoveredMessage(MessageReference message) throws Exception {
257            synchronized (pending) {
258                pending.addRecoveredMessage(message);
259            }
260        }
261    
262        public int getPendingQueueSize() {
263            if (active.get() || keepDurableSubsActive) {
264                return super.getPendingQueueSize();
265            }
266            // TODO: need to get from store
267            return 0;
268        }
269    
270        public void setSelector(String selector) throws InvalidSelectorException {
271            throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
272        }
273    
274        protected boolean canDispatch(MessageReference node) {
275            return isActive();
276        }
277    
278        protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
279            node.getRegionDestination().acknowledge(context, this, ack, node);
280            redeliveredMessages.remove(node.getMessageId());
281            node.decrementReferenceCount();
282        }
283    
284        public synchronized String toString() {
285            return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
286                    + durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter
287                    + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
288        }
289    
290        public SubscriptionKey getSubscriptionKey() {
291            return subscriptionKey;
292        }
293    
294        /**
295         * Release any references that we are holding.
296         */
297        public void destroy() {
298            synchronized (pendingLock) {
299                try {
300                    pending.reset();
301                    while (pending.hasNext()) {
302                        MessageReference node = pending.next();
303                        node.decrementReferenceCount();
304                    }
305    
306                } finally {
307                    pending.release();
308                    pending.clear();
309                }
310            }
311            synchronized (dispatchLock) {
312                for (MessageReference node : dispatched) {
313                    node.decrementReferenceCount();
314                }
315                dispatched.clear();
316            }
317            setSlowConsumer(false);
318        }
319    
320        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
321            if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
322                try {
323                    dispatchPending();
324                } catch (IOException e) {
325                    LOG.warn("problem calling dispatchMatched", e);
326                }
327            }
328        }
329    
330        protected boolean isDropped(MessageReference node) {
331            return false;
332        }
333    
334        public boolean isKeepDurableSubsActive() {
335            return keepDurableSubsActive;
336        }
337    }