001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.HashSet;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.Set;
024    import java.util.Timer;
025    import java.util.TimerTask;
026    import java.util.concurrent.ConcurrentHashMap;
027    
028    import javax.jms.InvalidDestinationException;
029    import javax.jms.JMSException;
030    
031    import org.apache.activemq.advisory.AdvisorySupport;
032    import org.apache.activemq.broker.ConnectionContext;
033    import org.apache.activemq.broker.region.policy.PolicyEntry;
034    import org.apache.activemq.command.ActiveMQDestination;
035    import org.apache.activemq.command.ConnectionId;
036    import org.apache.activemq.command.ConsumerId;
037    import org.apache.activemq.command.ConsumerInfo;
038    import org.apache.activemq.command.RemoveSubscriptionInfo;
039    import org.apache.activemq.command.SessionId;
040    import org.apache.activemq.command.SubscriptionInfo;
041    import org.apache.activemq.store.TopicMessageStore;
042    import org.apache.activemq.thread.TaskRunnerFactory;
043    import org.apache.activemq.usage.SystemUsage;
044    import org.apache.activemq.util.LongSequenceGenerator;
045    import org.apache.activemq.util.SubscriptionKey;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     *
051     */
052    public class TopicRegion extends AbstractRegion {
053        private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
054        protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
055        private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
056        private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
057        private boolean keepDurableSubsActive;
058    
059        private Timer cleanupTimer;
060        private TimerTask cleanupTask;
061    
062        public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
063                           DestinationFactory destinationFactory) {
064            super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
065            if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
066                this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
067                this.cleanupTask = new TimerTask() {
068                    public void run() {
069                        doCleanup();
070                    }
071                };
072                this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
073            }
074        }
075    
076        @Override
077        public void stop() throws Exception {
078            super.stop();
079            if (cleanupTimer != null) {
080                cleanupTimer.cancel();
081            }
082        }
083    
084        public void doCleanup() {
085            long now = System.currentTimeMillis();
086            for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
087                DurableTopicSubscription sub = entry.getValue();
088                if (!sub.isActive()) {
089                    long offline = sub.getOfflineTimestamp();
090                    if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
091                        LOG.info("Destroying durable subscriber due to inactivity: " + sub);
092                        try {
093                            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
094                            info.setClientId(entry.getKey().getClientId());
095                            info.setSubscriptionName(entry.getKey().getSubscriptionName());
096                            ConnectionContext context = new ConnectionContext();
097                            context.setBroker(broker);
098                            context.setClientId(entry.getKey().getClientId());
099                            removeSubscription(context, info);
100                        } catch (Exception e) {
101                            LOG.error("Failed to remove inactive durable subscriber", e);
102                        }
103                    }
104                }
105            }
106        }
107    
108        @Override
109        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
110            if (info.isDurable()) {
111                ActiveMQDestination destination = info.getDestination();
112                if (!destination.isPattern()) {
113                    // Make sure the destination is created.
114                    lookup(context, destination,true);
115                }
116                String clientId = context.getClientId();
117                String subscriptionName = info.getSubscriptionName();
118                SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
119                DurableTopicSubscription sub = durableSubscriptions.get(key);
120                if (sub != null) {
121                    if (sub.isActive()) {
122                        throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
123                    }
124                    // Has the selector changed??
125                    if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
126                        // Remove the consumer first then add it.
127                        durableSubscriptions.remove(key);
128                        destinationsLock.readLock().lock();
129                        try {
130                            for (Destination dest : destinations.values()) {
131                                //Account for virtual destinations
132                                if (dest instanceof Topic){
133                                    Topic topic = (Topic)dest;
134                                    topic.deleteSubscription(context, key);
135                                }
136                            }
137                        } finally {
138                            destinationsLock.readLock().unlock();
139                        }
140                        super.removeConsumer(context, sub.getConsumerInfo());
141                        super.addConsumer(context, info);
142                        sub = durableSubscriptions.get(key);
143                    } else {
144                        // Change the consumer id key of the durable sub.
145                        if (sub.getConsumerInfo().getConsumerId() != null) {
146                            subscriptions.remove(sub.getConsumerInfo().getConsumerId());
147                        }
148                        subscriptions.put(info.getConsumerId(), sub);
149                    }
150                } else {
151                    super.addConsumer(context, info);
152                    sub = durableSubscriptions.get(key);
153                    if (sub == null) {
154                        throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
155                                               + " subscriberName: " + key.getSubscriptionName());
156                    }
157                }
158                sub.activate(usageManager, context, info, broker);
159                return sub;
160            } else {
161                return super.addConsumer(context, info);
162            }
163        }
164    
165        @Override
166        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
167            if (info.isDurable()) {
168    
169                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
170                DurableTopicSubscription sub = durableSubscriptions.get(key);
171                if (sub != null) {
172                    sub.deactivate(keepDurableSubsActive);
173                }
174    
175            } else {
176                super.removeConsumer(context, info);
177            }
178        }
179    
180        @Override
181        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
182            SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
183            DurableTopicSubscription sub = durableSubscriptions.get(key);
184            if (sub == null) {
185                throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
186            }
187            if (sub.isActive()) {
188                throw new JMSException("Durable consumer is in use");
189            } else {
190                durableSubscriptions.remove(key);
191            }
192    
193            destinationsLock.readLock().lock();
194            try {
195                for (Destination dest : destinations.values()) {
196                    //Account for virtual destinations
197                    if (dest instanceof Topic){
198                        Topic topic = (Topic)dest;
199                        topic.deleteSubscription(context, key);
200                    }
201                }
202            } finally {
203                destinationsLock.readLock().unlock();
204            }
205    
206            if (subscriptions.get(sub.getConsumerInfo().getConsumerId()) != null) {
207                super.removeConsumer(context, sub.getConsumerInfo());
208            } else {
209                // try destroying inactive subscriptions
210                destroySubscription(sub);
211            }
212        }
213    
214        @Override
215        public String toString() {
216            return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
217        }
218    
219        @Override
220        protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
221            List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
222            Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
223    
224            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
225            // Eagerly recover the durable subscriptions
226            if (store != null) {
227                SubscriptionInfo[] infos = store.getAllSubscriptions();
228                for (int i = 0; i < infos.length; i++) {
229    
230                    SubscriptionInfo info = infos[i];
231                    if (LOG.isDebugEnabled()) {
232                        LOG.debug("Restoring durable subscription: " + info);
233                    }
234                    SubscriptionKey key = new SubscriptionKey(info);
235    
236                    // A single durable sub may be subscribing to multiple topics.
237                    // so it might exist already.
238                    DurableTopicSubscription sub = durableSubscriptions.get(key);
239                    ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
240                    if (sub == null) {
241                        ConnectionContext c = new ConnectionContext();
242                        c.setBroker(context.getBroker());
243                        c.setClientId(key.getClientId());
244                        c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
245                        sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
246                        sub.setOfflineTimestamp(System.currentTimeMillis());
247                    }
248    
249                    if (dupChecker.contains(sub)) {
250                        continue;
251                    }
252    
253                    dupChecker.add(sub);
254                    rc.add(sub);
255                    dest.addSubscription(context, sub);
256                }
257    
258                // Now perhaps there other durable subscriptions (via wild card)
259                // that would match this destination..
260                durableSubscriptions.values();
261                for (DurableTopicSubscription sub : durableSubscriptions.values()) {
262                    // Skip over subscriptions that we already added..
263                    if (dupChecker.contains(sub)) {
264                        continue;
265                    }
266    
267                    if (sub.matches(dest.getActiveMQDestination())) {
268                        rc.add(sub);
269                        dest.addSubscription(context, sub);
270                    }
271                }
272            }
273            return rc;
274        }
275    
276        public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
277            ConsumerInfo rc = new ConsumerInfo();
278            rc.setSelector(info.getSelector());
279            rc.setSubscriptionName(info.getSubscriptionName());
280            rc.setDestination(info.getSubscribedDestination());
281            rc.setConsumerId(createConsumerId());
282            return rc;
283        }
284    
285        private ConsumerId createConsumerId() {
286            return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
287        }
288    
289        protected void configureTopic(Topic topic, ActiveMQDestination destination) {
290            if (broker.getDestinationPolicy() != null) {
291                PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
292                if (entry != null) {
293                    entry.configure(broker,topic);
294                }
295            }
296        }
297    
298        @Override
299        protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
300            ActiveMQDestination destination = info.getDestination();
301    
302            if (info.isDurable()) {
303                if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
304                    throw new JMSException("Cannot create a durable subscription for an advisory Topic");
305                }
306                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
307                DurableTopicSubscription sub = durableSubscriptions.get(key);
308    
309                if (sub == null) {
310    
311                    sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
312    
313                    if (destination != null && broker.getDestinationPolicy() != null) {
314                        PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
315                        if (entry != null) {
316                            entry.configure(broker, usageManager, sub);
317                        }
318                    }
319                    durableSubscriptions.put(key, sub);
320                } else {
321                    throw new JMSException("That durable subscription is already active.");
322                }
323                return sub;
324            }
325            try {
326                TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
327                // lets configure the subscription depending on the destination
328                if (destination != null && broker.getDestinationPolicy() != null) {
329                    PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
330                    if (entry != null) {
331                        entry.configure(broker, usageManager, answer);
332                    }
333                }
334                answer.init();
335                return answer;
336            } catch (Exception e) {
337                LOG.error("Failed to create TopicSubscription ", e);
338                JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
339                jmsEx.setLinkedException(e);
340                throw jmsEx;
341            }
342        }
343    
344        private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
345            if (info1.getSelector() != null ^ info2.getSelector() != null) {
346                return true;
347            }
348            if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
349                return true;
350            }
351            return !info1.getDestination().equals(info2.getDestination());
352        }
353    
354        @Override
355        protected Set<ActiveMQDestination> getInactiveDestinations() {
356            Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
357            for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
358                ActiveMQDestination dest = iter.next();
359                if (!dest.isTopic()) {
360                    iter.remove();
361                }
362            }
363            return inactiveDestinations;
364        }
365    
366        public boolean isKeepDurableSubsActive() {
367            return keepDurableSubsActive;
368        }
369    
370        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
371            this.keepDurableSubsActive = keepDurableSubsActive;
372        }
373    
374        public boolean durableSubscriptionExists(SubscriptionKey key) {
375            return this.durableSubscriptions.containsKey(key);
376        }
377    }