001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.Collections;
021    import java.util.concurrent.ConcurrentHashMap;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    import java.util.concurrent.atomic.AtomicLong;
024    
025    import javax.jms.InvalidSelectorException;
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.broker.Broker;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031    import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
032    import org.apache.activemq.broker.region.policy.PolicyEntry;
033    import org.apache.activemq.command.ActiveMQDestination;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.Message;
036    import org.apache.activemq.command.MessageAck;
037    import org.apache.activemq.command.MessageDispatch;
038    import org.apache.activemq.command.MessageId;
039    import org.apache.activemq.store.TopicMessageStore;
040    import org.apache.activemq.usage.SystemUsage;
041    import org.apache.activemq.usage.Usage;
042    import org.apache.activemq.usage.UsageListener;
043    import org.apache.activemq.util.SubscriptionKey;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
048    
049        private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
050        private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
051        private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
052        private final SubscriptionKey subscriptionKey;
053        private final boolean keepDurableSubsActive;
054        private final AtomicBoolean active = new AtomicBoolean();
055        private final AtomicLong offlineTimestamp = new AtomicLong(-1);
056    
057        public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
058                throws JMSException {
059            super(broker, usageManager, context, info);
060            this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
061            this.pending.setSystemUsage(usageManager);
062            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
063            this.keepDurableSubsActive = keepDurableSubsActive;
064            subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
065        }
066    
067        public final boolean isActive() {
068            return active.get();
069        }
070    
071        public final long getOfflineTimestamp() {
072            return offlineTimestamp.get();
073        }
074    
075        public void setOfflineTimestamp(long timestamp) {
076            offlineTimestamp.set(timestamp);
077        }
078    
079        @Override
080        public boolean isFull() {
081            return !active.get() || super.isFull();
082        }
083    
084        @Override
085        public void gc() {
086        }
087    
088        /**
089         * store will have a pending ack for all durables, irrespective of the
090         * selector so we need to ack if node is un-matched
091         */
092        @Override
093        public void unmatched(MessageReference node) throws IOException {
094            MessageAck ack = new MessageAck();
095            ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
096            ack.setMessageID(node.getMessageId());
097            Destination regionDestination = (Destination) node.getRegionDestination();
098            regionDestination.acknowledge(this.getContext(), this, ack, node);
099        }
100    
101        @Override
102        protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
103            // statically configured via maxPageSize
104        }
105    
106        @Override
107        public void add(ConnectionContext context, Destination destination) throws Exception {
108            if (!destinations.contains(destination)) {
109                super.add(context, destination);
110            }
111            // do it just once per destination
112            if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
113                return;
114            }
115            durableDestinations.put(destination.getActiveMQDestination(), destination);
116    
117            if (active.get() || keepDurableSubsActive) {
118                Topic topic = (Topic) destination;
119                topic.activate(context, this);
120                if (pending.isEmpty(topic)) {
121                    topic.recoverRetroactiveMessages(context, this);
122                }
123                this.enqueueCounter += pending.size();
124            } else if (destination.getMessageStore() != null) {
125                TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
126                try {
127                    this.enqueueCounter += store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName());
128                } catch (IOException e) {
129                    JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e);
130                    jmsEx.setLinkedException(e);
131                    throw jmsEx;
132                }
133            }
134            dispatchPending();
135        }
136    
137        public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception {
138            if (!active.get()) {
139                this.context = context;
140                this.info = info;
141    
142                if (LOG.isDebugEnabled()) {
143                    LOG.debug("Activating " + this);
144                }
145                if (!keepDurableSubsActive) {
146                    for (Destination destination : durableDestinations.values()) {
147                        Topic topic = (Topic) destination;
148                        add(context, topic);
149                        topic.activate(context, this);
150                    }
151    
152                    // On Activation we should update the configuration based on our new consumer info.
153                    ActiveMQDestination dest = this.info.getDestination();
154                    if (dest != null && regionBroker.getDestinationPolicy() != null) {
155                        PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
156                        if (entry != null) {
157                            entry.configure(broker, usageManager, this);
158                        }
159                    }
160                }
161    
162                synchronized (pendingLock) {
163                    pending.setSystemUsage(memoryManager);
164                    pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
165                    pending.setMaxAuditDepth(getMaxAuditDepth());
166                    pending.setMaxProducersToAudit(getMaxProducersToAudit());
167                    pending.start();
168                    // If nothing was in the persistent store, then try to use the
169                    // recovery policy.
170                    if (pending.isEmpty()) {
171                        for (Destination destination : durableDestinations.values()) {
172                            Topic topic = (Topic) destination;
173                            topic.recoverRetroactiveMessages(context, this);
174                        }
175                    }
176                }
177                this.active.set(true);
178                this.offlineTimestamp.set(-1);
179                dispatchPending();
180                this.usageManager.getMemoryUsage().addUsageListener(this);
181            }
182        }
183    
184        public void deactivate(boolean keepDurableSubsActive) throws Exception {
185            if (LOG.isDebugEnabled()) {
186                LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this);
187            }
188            active.set(false);
189            offlineTimestamp.set(System.currentTimeMillis());
190            this.usageManager.getMemoryUsage().removeUsageListener(this);
191            synchronized (pendingLock) {
192                pending.stop();
193    
194                synchronized (dispatchLock) {
195                    for (Destination destination : durableDestinations.values()) {
196                        Topic topic = (Topic) destination;
197                        if (!keepDurableSubsActive) {
198                            topic.deactivate(context, this);
199                        } else {
200                            topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
201                        }
202                    }
203    
204                    // Before we add these back to pending they need to be in producer order not
205                    // dispatch order so we can add them to the front of the pending list.
206                    Collections.reverse(dispatched);
207    
208                    for (final MessageReference node : dispatched) {
209                        // Mark the dispatched messages as redelivered for next time.
210                        Integer count = redeliveredMessages.get(node.getMessageId());
211                        if (count != null) {
212                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
213                        } else {
214                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
215                        }
216                        if (keepDurableSubsActive && pending.isTransient()) {
217                            pending.addMessageFirst(node);
218                            pending.rollback(node.getMessageId());
219                        } else {
220                            node.decrementReferenceCount();
221                        }
222                    }
223    
224                    dispatched.clear();
225                }
226                if (!keepDurableSubsActive && pending.isTransient()) {
227                    try {
228                        pending.reset();
229                        while (pending.hasNext()) {
230                            MessageReference node = pending.next();
231                            node.decrementReferenceCount();
232                            pending.remove();
233                        }
234                    } finally {
235                        pending.release();
236                    }
237                }
238            }
239            prefetchExtension.set(0);
240        }
241    
242        @Override
243        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
244            MessageDispatch md = super.createMessageDispatch(node, message);
245            if (node != QueueMessageReference.NULL_MESSAGE) {
246                Integer count = redeliveredMessages.get(node.getMessageId());
247                if (count != null) {
248                    md.setRedeliveryCounter(count.intValue());
249                }
250            }
251            return md;
252        }
253    
254        @Override
255        public void add(MessageReference node) throws Exception {
256            if (!active.get() && !keepDurableSubsActive) {
257                return;
258            }
259            super.add(node);
260        }
261    
262        @Override
263        protected void dispatchPending() throws IOException {
264            if (isActive()) {
265                super.dispatchPending();
266            }
267        }
268    
269        public void removePending(MessageReference node) throws IOException {
270            pending.remove(node);
271        }
272    
273        @Override
274        protected void doAddRecoveredMessage(MessageReference message) throws Exception {
275            synchronized (pending) {
276                pending.addRecoveredMessage(message);
277            }
278        }
279    
280        @Override
281        public int getPendingQueueSize() {
282            if (active.get() || keepDurableSubsActive) {
283                return super.getPendingQueueSize();
284            }
285            // TODO: need to get from store
286            return 0;
287        }
288    
289        @Override
290        public void setSelector(String selector) throws InvalidSelectorException {
291            throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
292        }
293    
294        @Override
295        protected boolean canDispatch(MessageReference node) {
296            return isActive();
297        }
298    
299        @Override
300        protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
301            Destination regionDestination = (Destination) node.getRegionDestination();
302            regionDestination.acknowledge(context, this, ack, node);
303            redeliveredMessages.remove(node.getMessageId());
304            node.decrementReferenceCount();
305        }
306    
307        @Override
308        public synchronized String toString() {
309            return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
310                    + durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter
311                    + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
312        }
313    
314        public SubscriptionKey getSubscriptionKey() {
315            return subscriptionKey;
316        }
317    
318        /**
319         * Release any references that we are holding.
320         */
321        @Override
322        public void destroy() {
323            synchronized (pendingLock) {
324                try {
325                    pending.reset();
326                    while (pending.hasNext()) {
327                        MessageReference node = pending.next();
328                        node.decrementReferenceCount();
329                    }
330    
331                } finally {
332                    pending.release();
333                    pending.clear();
334                }
335            }
336            synchronized (dispatchLock) {
337                for (MessageReference node : dispatched) {
338                    node.decrementReferenceCount();
339                }
340                dispatched.clear();
341            }
342            setSlowConsumer(false);
343        }
344    
345        @Override
346        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
347            if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
348                try {
349                    dispatchPending();
350                } catch (IOException e) {
351                    LOG.warn("problem calling dispatchMatched", e);
352                }
353            }
354        }
355    
356        @Override
357        protected boolean isDropped(MessageReference node) {
358            return false;
359        }
360    
361        public boolean isKeepDurableSubsActive() {
362            return keepDurableSubsActive;
363        }
364    }