001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.HashSet;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.Set;
024    import java.util.Timer;
025    import java.util.TimerTask;
026    import java.util.concurrent.ConcurrentHashMap;
027    
028    import javax.jms.InvalidDestinationException;
029    import javax.jms.JMSException;
030    
031    import org.apache.activemq.advisory.AdvisorySupport;
032    import org.apache.activemq.broker.ConnectionContext;
033    import org.apache.activemq.broker.region.policy.PolicyEntry;
034    import org.apache.activemq.command.ActiveMQDestination;
035    import org.apache.activemq.command.ConnectionId;
036    import org.apache.activemq.command.ConsumerId;
037    import org.apache.activemq.command.ConsumerInfo;
038    import org.apache.activemq.command.RemoveSubscriptionInfo;
039    import org.apache.activemq.command.SessionId;
040    import org.apache.activemq.command.SubscriptionInfo;
041    import org.apache.activemq.store.TopicMessageStore;
042    import org.apache.activemq.thread.TaskRunnerFactory;
043    import org.apache.activemq.usage.SystemUsage;
044    import org.apache.activemq.util.LongSequenceGenerator;
045    import org.apache.activemq.util.SubscriptionKey;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     *
051     */
052    public class TopicRegion extends AbstractRegion {
053        private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
054        protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
055        private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
056        private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
057        private boolean keepDurableSubsActive;
058    
059        private Timer cleanupTimer;
060        private TimerTask cleanupTask;
061    
062        public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
063                           DestinationFactory destinationFactory) {
064            super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
065            if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
066                this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
067                this.cleanupTask = new TimerTask() {
068                    public void run() {
069                        doCleanup();
070                    }
071                };
072                this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
073            }
074        }
075    
076        @Override
077        public void stop() throws Exception {
078            super.stop();
079            if (cleanupTimer != null) {
080                cleanupTimer.cancel();
081            }
082        }
083    
084        public void doCleanup() {
085            long now = System.currentTimeMillis();
086            for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
087                DurableTopicSubscription sub = entry.getValue();
088                if (!sub.isActive()) {
089                   long offline = sub.getOfflineTimestamp();
090                    if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
091                        LOG.info("Destroying durable subscriber due to inactivity: " + sub);
092                        try {
093                            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
094                            info.setClientId(entry.getKey().getClientId());
095                            info.setSubscriptionName(entry.getKey().getSubscriptionName());
096                            ConnectionContext context = new ConnectionContext();
097                            context.setBroker(broker);
098                            context.setClientId(entry.getKey().getClientId());
099                            removeSubscription(context, info);
100                        } catch (Exception e) {
101                            LOG.error("Failed to remove inactive durable subscriber", e);
102                        }
103                    }
104                }
105            }
106        }
107    
108        @Override
109        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
110            if (info.isDurable()) {
111                ActiveMQDestination destination = info.getDestination();
112                if (!destination.isPattern()) {
113                    // Make sure the destination is created.
114                    lookup(context, destination,true);
115                }
116                String clientId = context.getClientId();
117                String subscriptionName = info.getSubscriptionName();
118                SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
119                DurableTopicSubscription sub = durableSubscriptions.get(key);
120                if (sub != null) {
121                    if (sub.isActive()) {
122                        throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
123                    }
124                    // Has the selector changed??
125                    if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
126                        // Remove the consumer first then add it.
127                        durableSubscriptions.remove(key);
128                        destinationsLock.readLock().lock();
129                        try {
130                            for (Destination dest : destinations.values()) {
131                                //Account for virtual destinations
132                                if (dest instanceof Topic){
133                                    Topic topic = (Topic)dest;
134                                    topic.deleteSubscription(context, key);
135                                }
136                            }
137                        } finally {
138                            destinationsLock.readLock().unlock();
139                        }
140                        super.removeConsumer(context, sub.getConsumerInfo());
141                        super.addConsumer(context, info);
142                        sub = durableSubscriptions.get(key);
143                    } else {
144                        // Change the consumer id key of the durable sub.
145                        if (sub.getConsumerInfo().getConsumerId() != null) {
146                            subscriptions.remove(sub.getConsumerInfo().getConsumerId());
147                        }
148                        subscriptions.put(info.getConsumerId(), sub);
149                    }
150                } else {
151                    super.addConsumer(context, info);
152                    sub = durableSubscriptions.get(key);
153                    if (sub == null) {
154                        throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
155                                               + " subscriberName: " + key.getSubscriptionName());
156                    }
157                }
158                sub.activate(usageManager, context, info, broker);
159                return sub;
160            } else {
161                return super.addConsumer(context, info);
162            }
163        }
164    
165        @Override
166        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
167            if (info.isDurable()) {
168    
169                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
170                DurableTopicSubscription sub = durableSubscriptions.get(key);
171                if (sub != null) {
172                    sub.deactivate(keepDurableSubsActive);
173                }
174    
175            } else {
176                super.removeConsumer(context, info);
177            }
178        }
179    
180        @Override
181        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
182            SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
183            DurableTopicSubscription sub = durableSubscriptions.get(key);
184            if (sub == null) {
185                throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
186            }
187            if (sub.isActive()) {
188                throw new JMSException("Durable consumer is in use");
189            } else {
190                durableSubscriptions.remove(key);
191            }
192    
193            destinationsLock.readLock().lock();
194            try {
195                for (Destination dest : destinations.values()) {
196                    //Account for virtual destinations
197                    if (dest instanceof Topic){
198                        Topic topic = (Topic)dest;
199                        topic.deleteSubscription(context, key);
200                    }
201                }
202            } finally {
203                destinationsLock.readLock().unlock();
204            }
205    
206            if (subscriptions.get(sub.getConsumerInfo().getConsumerId()) != null) {
207                super.removeConsumer(context, sub.getConsumerInfo());
208            } else {
209                // try destroying inactive subscriptions
210                destroySubscription(sub);
211            }
212        }
213    
214        @Override
215        public String toString() {
216            return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
217        }
218    
219        @Override
220        protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
221            List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
222            Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
223    
224            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
225            // Eagerly recover the durable subscriptions
226            if (store != null) {
227                SubscriptionInfo[] infos = store.getAllSubscriptions();
228                for (int i = 0; i < infos.length; i++) {
229    
230                    SubscriptionInfo info = infos[i];
231                    if (LOG.isDebugEnabled()) {
232                        LOG.debug("Restoring durable subscription: " + info);
233                    }
234                    SubscriptionKey key = new SubscriptionKey(info);
235    
236                    // A single durable sub may be subscribing to multiple topics.
237                    // so it might exist already.
238                    DurableTopicSubscription sub = durableSubscriptions.get(key);
239                    ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
240                    if (sub == null) {
241                        ConnectionContext c = new ConnectionContext();
242                        c.setBroker(context.getBroker());
243                        c.setClientId(key.getClientId());
244                        c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
245                        sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
246                    }
247    
248                    if (dupChecker.contains(sub)) {
249                        continue;
250                    }
251    
252                    dupChecker.add(sub);
253                    rc.add(sub);
254                    dest.addSubscription(context, sub);
255                }
256    
257                // Now perhaps there other durable subscriptions (via wild card)
258                // that would match this destination..
259                durableSubscriptions.values();
260                for (DurableTopicSubscription sub : durableSubscriptions.values()) {
261                    // Skip over subscriptions that we allready added..
262                    if (dupChecker.contains(sub)) {
263                        continue;
264                    }
265    
266                    if (sub.matches(dest.getActiveMQDestination())) {
267                        rc.add(sub);
268                        dest.addSubscription(context, sub);
269                    }
270                }
271            }
272            return rc;
273        }
274    
275        public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
276            ConsumerInfo rc = new ConsumerInfo();
277            rc.setSelector(info.getSelector());
278            rc.setSubscriptionName(info.getSubscriptionName());
279            rc.setDestination(info.getSubscribedDestination());
280            rc.setConsumerId(createConsumerId());
281            return rc;
282        }
283    
284        private ConsumerId createConsumerId() {
285            return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
286        }
287    
288        protected void configureTopic(Topic topic, ActiveMQDestination destination) {
289            if (broker.getDestinationPolicy() != null) {
290                PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
291                if (entry != null) {
292                    entry.configure(broker,topic);
293                }
294            }
295        }
296    
297        @Override
298        protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
299            ActiveMQDestination destination = info.getDestination();
300    
301            if (info.isDurable()) {
302                if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
303                    throw new JMSException("Cannot create a durable subscription for an advisory Topic");
304                }
305                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
306                DurableTopicSubscription sub = durableSubscriptions.get(key);
307    
308                if (sub == null) {
309    
310                    sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
311    
312                    if (destination != null && broker.getDestinationPolicy() != null) {
313                        PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
314                        if (entry != null) {
315                            entry.configure(broker, usageManager, sub);
316                        }
317                    }
318                    durableSubscriptions.put(key, sub);
319                } else {
320                    throw new JMSException("That durable subscription is already active.");
321                }
322                return sub;
323            }
324            try {
325                TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
326                // lets configure the subscription depending on the destination
327                if (destination != null && broker.getDestinationPolicy() != null) {
328                    PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
329                    if (entry != null) {
330                        entry.configure(broker, usageManager, answer);
331                    }
332                }
333                answer.init();
334                return answer;
335            } catch (Exception e) {
336                LOG.error("Failed to create TopicSubscription ", e);
337                JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
338                jmsEx.setLinkedException(e);
339                throw jmsEx;
340            }
341        }
342    
343        private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
344            if (info1.getSelector() != null ^ info2.getSelector() != null) {
345                return true;
346            }
347            if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
348                return true;
349            }
350            return !info1.getDestination().equals(info2.getDestination());
351        }
352    
353        @Override
354        protected Set<ActiveMQDestination> getInactiveDestinations() {
355            Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
356            for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
357                ActiveMQDestination dest = iter.next();
358                if (!dest.isTopic()) {
359                    iter.remove();
360                }
361            }
362            return inactiveDestinations;
363        }
364    
365        public boolean isKeepDurableSubsActive() {
366            return keepDurableSubsActive;
367        }
368    
369        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
370            this.keepDurableSubsActive = keepDurableSubsActive;
371        }
372    
373        public boolean durableSubscriptionExists(SubscriptionKey key) {
374            return this.durableSubscriptions.containsKey(key);
375        }
376    }