001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.concurrent.ConcurrentHashMap;
022    
023    import javax.jms.InvalidSelectorException;
024    import javax.jms.JMSException;
025    
026    import org.apache.activemq.broker.Broker;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ConsumerInfo;
031    import org.apache.activemq.command.Message;
032    import org.apache.activemq.command.MessageAck;
033    import org.apache.activemq.command.MessageDispatch;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.store.TopicMessageStore;
036    import org.apache.activemq.usage.SystemUsage;
037    import org.apache.activemq.usage.Usage;
038    import org.apache.activemq.usage.UsageListener;
039    import org.apache.activemq.util.SubscriptionKey;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
044    
045        private static final Log LOG = LogFactory.getLog(DurableTopicSubscription.class);
046        private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
047        private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
048        private final SubscriptionKey subscriptionKey;
049        private final boolean keepDurableSubsActive;
050        private boolean active;
051    
052        public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
053            throws JMSException {
054            super(broker,usageManager, context, info);
055            this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
056            this.pending.setSystemUsage(usageManager);
057            this.keepDurableSubsActive = keepDurableSubsActive;
058            subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
059            
060        }
061    
062        public boolean isActive() {
063            return active;
064        }
065    
066        public boolean isFull() {
067            return !active || super.isFull();
068        }
069    
070        public void gc() {
071        }
072    
073        public void add(ConnectionContext context, Destination destination) throws Exception {
074            super.add(context, destination);
075            destinations.put(destination.getActiveMQDestination(), destination);
076            if (destination.getMessageStore() != null) {
077                TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
078                try {
079                    this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
080                } catch (IOException e) {
081                    JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
082                    jmsEx.setLinkedException(e);
083                    throw jmsEx;
084                }
085            }
086            if (active || keepDurableSubsActive) {
087                Topic topic = (Topic)destination;
088                topic.activate(context, this);
089                if (pending.isEmpty(topic)) {
090                    topic.recoverRetroactiveMessages(context, this);
091                }
092            }
093            dispatchPending();
094        }
095    
096        public void activate(SystemUsage memoryManager, ConnectionContext context,
097                ConsumerInfo info) throws Exception {
098            LOG.debug("Activating " + this);
099            if (!active) {
100                this.active = true;
101                this.context = context;
102                this.info = info;
103                int prefetch = info.getPrefetchSize();
104                if (prefetch>0) {
105                prefetch += prefetch/2;
106                }
107                int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
108                this.pending.setMaxAuditDepth(depth);
109                if (!keepDurableSubsActive) {
110                    for (Iterator<Destination> iter = destinations.values()
111                            .iterator(); iter.hasNext();) {
112                        Topic topic = (Topic) iter.next();
113                        topic.activate(context, this);
114                    }
115                }
116                synchronized (pending) {
117                    pending.setSystemUsage(memoryManager);
118                    pending.start();
119    
120                    // If nothing was in the persistent store, then try to use the
121                    // recovery policy.
122                    if (pending.isEmpty()) {
123                        for (Iterator<Destination> iter = destinations.values()
124                                .iterator(); iter.hasNext();) {
125                            Topic topic = (Topic) iter.next();
126                            topic.recoverRetroactiveMessages(context, this);
127                        }
128                    }
129                }
130                dispatchPending();
131                this.usageManager.getMemoryUsage().addUsageListener(this);
132            }
133        }
134    
135        public void deactivate(boolean keepDurableSubsActive) throws Exception {
136            active = false;
137            this.usageManager.getMemoryUsage().removeUsageListener(this);
138            synchronized (pending) {
139                pending.stop();
140            }
141            if (!keepDurableSubsActive) {
142                for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
143                    Topic topic = (Topic)iter.next();
144                    topic.deactivate(context, this);
145                }
146            }
147            for (final MessageReference node : dispatched) {
148                // Mark the dispatched messages as redelivered for next time.
149                Integer count = redeliveredMessages.get(node.getMessageId());
150                if (count != null) {
151                    redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
152                } else {
153                    redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
154                }
155                if (keepDurableSubsActive&& pending.isTransient()) {
156                    synchronized (pending) {
157                        pending.addMessageFirst(node);
158                    }
159                } else {
160                    node.decrementReferenceCount();
161                }
162            }
163            synchronized(dispatched) {
164                dispatched.clear();
165            }
166            if (!keepDurableSubsActive && pending.isTransient()) {
167                synchronized (pending) {
168                    try {
169                        pending.reset();
170                        while (pending.hasNext()) {
171                            MessageReference node = pending.next();
172                            node.decrementReferenceCount();
173                            pending.remove();
174                        }
175                    } finally {
176                        pending.release();
177                    }
178                }
179            }
180            prefetchExtension = 0;
181        }
182        
183        
184        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
185            MessageDispatch md = super.createMessageDispatch(node, message);
186            Integer count = redeliveredMessages.get(node.getMessageId());
187            if (count != null) {
188                md.setRedeliveryCounter(count.intValue());
189            }
190            return md;
191        }
192    
193        public void add(MessageReference node) throws Exception {
194            if (!active && !keepDurableSubsActive) {
195                return;
196            }
197            super.add(node);
198        }
199    
200        protected void doAddRecoveredMessage(MessageReference message) throws Exception {
201            synchronized(pending) {
202            pending.addRecoveredMessage(message);
203            }
204        }
205    
206        public int getPendingQueueSize() {
207            if (active || keepDurableSubsActive) {
208                return super.getPendingQueueSize();
209            }
210            // TODO: need to get from store
211            return 0;
212        }
213    
214        public void setSelector(String selector) throws InvalidSelectorException {
215            throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
216        }
217    
218        protected boolean canDispatch(MessageReference node) {
219            return active;
220        }
221    
222        protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
223            node.getRegionDestination().acknowledge(context, this, ack, node);
224            redeliveredMessages.remove(node.getMessageId());
225            node.decrementReferenceCount();
226        }
227    
228        
229        public synchronized String toString() {
230            return "DurableTopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
231                   + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
232        }
233    
234        public SubscriptionKey getSubscriptionKey() {
235            return subscriptionKey;
236        }
237    
238        /**
239         * Release any references that we are holding.
240         */
241        public void destroy() {
242            synchronized (pending) {
243                try {
244    
245                    pending.reset();
246                    while (pending.hasNext()) {
247                        MessageReference node = pending.next();
248                        node.decrementReferenceCount();
249                    }
250    
251                } finally {
252                    pending.release();
253                    pending.clear();
254                }
255            }
256            synchronized(dispatched) {
257                for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
258                    MessageReference node = (MessageReference) iter.next();
259                    node.decrementReferenceCount();
260                }
261                dispatched.clear();
262            }
263        }
264    
265        /**
266         * @param usageManager
267         * @param oldPercentUsage
268         * @param newPercentUsage
269         * @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,
270         *      int, int)
271         */
272        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
273            if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
274                try {
275                    dispatchPending();
276                } catch (IOException e) {
277                    LOG.warn("problem calling dispatchMatched", e);
278                }
279            }
280        }
281        
282        protected boolean isDropped(MessageReference node) {
283           return false;
284         }
285    }