001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Collections;
022    import java.util.List;
023    import java.util.concurrent.ConcurrentHashMap;
024    import java.util.concurrent.atomic.AtomicBoolean;
025    import java.util.concurrent.atomic.AtomicLong;
026    
027    import javax.jms.InvalidSelectorException;
028    import javax.jms.JMSException;
029    
030    import org.apache.activemq.broker.Broker;
031    import org.apache.activemq.broker.ConnectionContext;
032    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033    import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
034    import org.apache.activemq.broker.region.policy.PolicyEntry;
035    import org.apache.activemq.command.ActiveMQDestination;
036    import org.apache.activemq.command.ConsumerInfo;
037    import org.apache.activemq.command.Message;
038    import org.apache.activemq.command.MessageAck;
039    import org.apache.activemq.command.MessageDispatch;
040    import org.apache.activemq.command.MessageId;
041    import org.apache.activemq.store.TopicMessageStore;
042    import org.apache.activemq.usage.SystemUsage;
043    import org.apache.activemq.usage.Usage;
044    import org.apache.activemq.usage.UsageListener;
045    import org.apache.activemq.util.SubscriptionKey;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
050    
051        private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
052        private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
053        private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
054        private final SubscriptionKey subscriptionKey;
055        private final boolean keepDurableSubsActive;
056        private final AtomicBoolean active = new AtomicBoolean();
057        private final AtomicLong offlineTimestamp = new AtomicLong(-1);
058    
059        public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
060                throws JMSException {
061            super(broker, usageManager, context, info);
062            this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
063            this.pending.setSystemUsage(usageManager);
064            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
065            this.keepDurableSubsActive = keepDurableSubsActive;
066            subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
067        }
068    
069        public final boolean isActive() {
070            return active.get();
071        }
072    
073        public final long getOfflineTimestamp() {
074            return offlineTimestamp.get();
075        }
076    
077        public void setOfflineTimestamp(long timestamp) {
078            offlineTimestamp.set(timestamp);
079        }
080    
081        @Override
082        public boolean isFull() {
083            return !active.get() || super.isFull();
084        }
085    
086        @Override
087        public void gc() {
088        }
089    
090        /**
091         * store will have a pending ack for all durables, irrespective of the
092         * selector so we need to ack if node is un-matched
093         */
094        @Override
095        public void unmatched(MessageReference node) throws IOException {
096            MessageAck ack = new MessageAck();
097            ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
098            ack.setMessageID(node.getMessageId());
099            Destination regionDestination = (Destination) node.getRegionDestination();
100            regionDestination.acknowledge(this.getContext(), this, ack, node);
101        }
102    
103        @Override
104        protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
105            // statically configured via maxPageSize
106        }
107    
108        @Override
109        public void add(ConnectionContext context, Destination destination) throws Exception {
110            if (!destinations.contains(destination)) {
111                super.add(context, destination);
112            }
113            // do it just once per destination
114            if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
115                return;
116            }
117            durableDestinations.put(destination.getActiveMQDestination(), destination);
118    
119            if (active.get() || keepDurableSubsActive) {
120                Topic topic = (Topic) destination;
121                topic.activate(context, this);
122                if (pending.isEmpty(topic)) {
123                    topic.recoverRetroactiveMessages(context, this);
124                }
125                this.enqueueCounter += pending.size();
126            } else if (destination.getMessageStore() != null) {
127                TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
128                try {
129                    this.enqueueCounter += store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName());
130                } catch (IOException e) {
131                    JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e);
132                    jmsEx.setLinkedException(e);
133                    throw jmsEx;
134                }
135            }
136            dispatchPending();
137        }
138    
139        public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception {
140            if (!active.get()) {
141                this.context = context;
142                this.info = info;
143    
144                LOG.debug("Activating {}", this);
145                if (!keepDurableSubsActive) {
146                    for (Destination destination : durableDestinations.values()) {
147                        Topic topic = (Topic) destination;
148                        add(context, topic);
149                        topic.activate(context, this);
150                    }
151    
152                    // On Activation we should update the configuration based on our new consumer info.
153                    ActiveMQDestination dest = this.info.getDestination();
154                    if (dest != null && regionBroker.getDestinationPolicy() != null) {
155                        PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
156                        if (entry != null) {
157                            entry.configure(broker, usageManager, this);
158                        }
159                    }
160                }
161    
162                synchronized (pendingLock) {
163                    if (!((StoreDurableSubscriberCursor) pending).isStarted() || !keepDurableSubsActive) {
164                        pending.setSystemUsage(memoryManager);
165                        pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
166                        pending.setMaxAuditDepth(getMaxAuditDepth());
167                        pending.setMaxProducersToAudit(getMaxProducersToAudit());
168                        pending.start();
169                        // If nothing was in the persistent store, then try to use the
170                        // recovery policy.
171                        if (pending.isEmpty()) {
172                            for (Destination destination : durableDestinations.values()) {
173                                Topic topic = (Topic) destination;
174                                topic.recoverRetroactiveMessages(context, this);
175                            }
176                        }
177                    }
178                }
179                this.active.set(true);
180                this.offlineTimestamp.set(-1);
181                dispatchPending();
182                this.usageManager.getMemoryUsage().addUsageListener(this);
183            }
184        }
185    
186        public void deactivate(boolean keepDurableSubsActive) throws Exception {
187            LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this);
188            active.set(false);
189            offlineTimestamp.set(System.currentTimeMillis());
190            this.usageManager.getMemoryUsage().removeUsageListener(this);
191    
192            ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>();
193            List<MessageReference> savedDispateched = null;
194    
195            synchronized (pendingLock) {
196                if (!keepDurableSubsActive) {
197                    pending.stop();
198                }
199    
200                synchronized (dispatchLock) {
201                    for (Destination destination : durableDestinations.values()) {
202                        Topic topic = (Topic) destination;
203                        if (!keepDurableSubsActive) {
204                            topicsToDeactivate.add(topic);
205                        } else {
206                            topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
207                        }
208                    }
209    
210                    // Before we add these back to pending they need to be in producer order not
211                    // dispatch order so we can add them to the front of the pending list.
212                    Collections.reverse(dispatched);
213    
214                    for (final MessageReference node : dispatched) {
215                        // Mark the dispatched messages as redelivered for next time.
216                        Integer count = redeliveredMessages.get(node.getMessageId());
217                        if (count != null) {
218                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
219                        } else {
220                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
221                        }
222                        if (keepDurableSubsActive && pending.isTransient()) {
223                            pending.addMessageFirst(node);
224                            pending.rollback(node.getMessageId());
225                        } else {
226                            node.decrementReferenceCount();
227                        }
228                    }
229    
230                    if (!topicsToDeactivate.isEmpty()) {
231                        savedDispateched = new ArrayList<MessageReference>(dispatched);
232                    }
233                    dispatched.clear();
234                }
235                if (!keepDurableSubsActive && pending.isTransient()) {
236                    try {
237                        pending.reset();
238                        while (pending.hasNext()) {
239                            MessageReference node = pending.next();
240                            node.decrementReferenceCount();
241                            pending.remove();
242                        }
243                    } finally {
244                        pending.release();
245                    }
246                }
247            }
248            for(Topic topic: topicsToDeactivate) {
249                topic.deactivate(context, this, savedDispateched);
250            }
251            prefetchExtension.set(0);
252        }
253    
254        @Override
255        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
256            MessageDispatch md = super.createMessageDispatch(node, message);
257            if (node != QueueMessageReference.NULL_MESSAGE) {
258                Integer count = redeliveredMessages.get(node.getMessageId());
259                if (count != null) {
260                    md.setRedeliveryCounter(count.intValue());
261                }
262            }
263            return md;
264        }
265    
266        @Override
267        public void add(MessageReference node) throws Exception {
268            if (!active.get() && !keepDurableSubsActive) {
269                return;
270            }
271            super.add(node);
272        }
273    
274        @Override
275        protected void dispatchPending() throws IOException {
276            if (isActive()) {
277                super.dispatchPending();
278            }
279        }
280    
281        public void removePending(MessageReference node) throws IOException {
282            pending.remove(node);
283        }
284    
285        @Override
286        protected void doAddRecoveredMessage(MessageReference message) throws Exception {
287            synchronized (pending) {
288                pending.addRecoveredMessage(message);
289            }
290        }
291    
292        @Override
293        public int getPendingQueueSize() {
294            if (active.get() || keepDurableSubsActive) {
295                return super.getPendingQueueSize();
296            }
297            // TODO: need to get from store
298            return 0;
299        }
300    
301        @Override
302        public void setSelector(String selector) throws InvalidSelectorException {
303            throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
304        }
305    
306        @Override
307        protected boolean canDispatch(MessageReference node) {
308            return true;  // let them go, our dispatchPending gates the active / inactive state.
309        }
310    
311        @Override
312        protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
313            this.setTimeOfLastMessageAck(System.currentTimeMillis());
314            Destination regionDestination = (Destination) node.getRegionDestination();
315            regionDestination.acknowledge(context, this, ack, node);
316            redeliveredMessages.remove(node.getMessageId());
317            node.decrementReferenceCount();
318        }
319    
320        @Override
321        public synchronized String toString() {
322            return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
323                    + durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter
324                    + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
325        }
326    
327        public SubscriptionKey getSubscriptionKey() {
328            return subscriptionKey;
329        }
330    
331        /**
332         * Release any references that we are holding.
333         */
334        @Override
335        public void destroy() {
336            synchronized (pendingLock) {
337                try {
338                    pending.reset();
339                    while (pending.hasNext()) {
340                        MessageReference node = pending.next();
341                        node.decrementReferenceCount();
342                    }
343    
344                } finally {
345                    pending.release();
346                    pending.clear();
347                }
348            }
349            synchronized (dispatchLock) {
350                for (MessageReference node : dispatched) {
351                    node.decrementReferenceCount();
352                }
353                dispatched.clear();
354            }
355            setSlowConsumer(false);
356        }
357    
358        @Override
359        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
360            if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
361                try {
362                    dispatchPending();
363                } catch (IOException e) {
364                    LOG.warn("problem calling dispatchMatched", e);
365                }
366            }
367        }
368    
369        @Override
370        protected boolean isDropped(MessageReference node) {
371            return false;
372        }
373    
374        public boolean isKeepDurableSubsActive() {
375            return keepDurableSubsActive;
376        }
377    }