001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.concurrent.ConcurrentHashMap;
022
023 import javax.jms.InvalidSelectorException;
024 import javax.jms.JMSException;
025
026 import org.apache.activemq.broker.Broker;
027 import org.apache.activemq.broker.ConnectionContext;
028 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ConsumerInfo;
031 import org.apache.activemq.command.Message;
032 import org.apache.activemq.command.MessageAck;
033 import org.apache.activemq.command.MessageDispatch;
034 import org.apache.activemq.command.MessageId;
035 import org.apache.activemq.store.TopicMessageStore;
036 import org.apache.activemq.usage.SystemUsage;
037 import org.apache.activemq.usage.Usage;
038 import org.apache.activemq.usage.UsageListener;
039 import org.apache.activemq.util.SubscriptionKey;
040 import org.apache.commons.logging.Log;
041 import org.apache.commons.logging.LogFactory;
042
043 public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
044
045 private static final Log LOG = LogFactory.getLog(DurableTopicSubscription.class);
046 private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
047 private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
048 private final SubscriptionKey subscriptionKey;
049 private final boolean keepDurableSubsActive;
050 private boolean active;
051
052 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
053 throws JMSException {
054 super(broker,usageManager, context, info);
055 this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
056 this.pending.setSystemUsage(usageManager);
057 this.keepDurableSubsActive = keepDurableSubsActive;
058 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
059
060 }
061
062 public boolean isActive() {
063 return active;
064 }
065
066 public boolean isFull() {
067 return !active || super.isFull();
068 }
069
070 public void gc() {
071 }
072
073 public void add(ConnectionContext context, Destination destination) throws Exception {
074 super.add(context, destination);
075 destinations.put(destination.getActiveMQDestination(), destination);
076 if (destination.getMessageStore() != null) {
077 TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
078 try {
079 this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
080 } catch (IOException e) {
081 JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
082 jmsEx.setLinkedException(e);
083 throw jmsEx;
084 }
085 }
086 if (active || keepDurableSubsActive) {
087 Topic topic = (Topic)destination;
088 topic.activate(context, this);
089 if (pending.isEmpty(topic)) {
090 topic.recoverRetroactiveMessages(context, this);
091 }
092 }
093 dispatchPending();
094 }
095
096 public void activate(SystemUsage memoryManager, ConnectionContext context,
097 ConsumerInfo info) throws Exception {
098 LOG.debug("Activating " + this);
099 if (!active) {
100 this.active = true;
101 this.context = context;
102 this.info = info;
103 int prefetch = info.getPrefetchSize();
104 if (prefetch>0) {
105 prefetch += prefetch/2;
106 }
107 int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
108 this.pending.setMaxAuditDepth(depth);
109 if (!keepDurableSubsActive) {
110 for (Iterator<Destination> iter = destinations.values()
111 .iterator(); iter.hasNext();) {
112 Topic topic = (Topic) iter.next();
113 topic.activate(context, this);
114 }
115 }
116 synchronized (pending) {
117 pending.setSystemUsage(memoryManager);
118 pending.start();
119
120 // If nothing was in the persistent store, then try to use the
121 // recovery policy.
122 if (pending.isEmpty()) {
123 for (Iterator<Destination> iter = destinations.values()
124 .iterator(); iter.hasNext();) {
125 Topic topic = (Topic) iter.next();
126 topic.recoverRetroactiveMessages(context, this);
127 }
128 }
129 }
130 dispatchPending();
131 this.usageManager.getMemoryUsage().addUsageListener(this);
132 }
133 }
134
135 public void deactivate(boolean keepDurableSubsActive) throws Exception {
136 active = false;
137 this.usageManager.getMemoryUsage().removeUsageListener(this);
138 synchronized (pending) {
139 pending.stop();
140 }
141 if (!keepDurableSubsActive) {
142 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
143 Topic topic = (Topic)iter.next();
144 topic.deactivate(context, this);
145 }
146 }
147 for (final MessageReference node : dispatched) {
148 // Mark the dispatched messages as redelivered for next time.
149 Integer count = redeliveredMessages.get(node.getMessageId());
150 if (count != null) {
151 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
152 } else {
153 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
154 }
155 if (keepDurableSubsActive&& pending.isTransient()) {
156 synchronized (pending) {
157 pending.addMessageFirst(node);
158 }
159 } else {
160 node.decrementReferenceCount();
161 }
162 }
163 synchronized(dispatched) {
164 dispatched.clear();
165 }
166 if (!keepDurableSubsActive && pending.isTransient()) {
167 synchronized (pending) {
168 try {
169 pending.reset();
170 while (pending.hasNext()) {
171 MessageReference node = pending.next();
172 node.decrementReferenceCount();
173 pending.remove();
174 }
175 } finally {
176 pending.release();
177 }
178 }
179 }
180 prefetchExtension = 0;
181 }
182
183
184 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
185 MessageDispatch md = super.createMessageDispatch(node, message);
186 Integer count = redeliveredMessages.get(node.getMessageId());
187 if (count != null) {
188 md.setRedeliveryCounter(count.intValue());
189 }
190 return md;
191 }
192
193 public void add(MessageReference node) throws Exception {
194 if (!active && !keepDurableSubsActive) {
195 return;
196 }
197 super.add(node);
198 }
199
200 protected void doAddRecoveredMessage(MessageReference message) throws Exception {
201 synchronized(pending) {
202 pending.addRecoveredMessage(message);
203 }
204 }
205
206 public int getPendingQueueSize() {
207 if (active || keepDurableSubsActive) {
208 return super.getPendingQueueSize();
209 }
210 // TODO: need to get from store
211 return 0;
212 }
213
214 public void setSelector(String selector) throws InvalidSelectorException {
215 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
216 }
217
218 protected boolean canDispatch(MessageReference node) {
219 return active;
220 }
221
222 protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
223 node.getRegionDestination().acknowledge(context, this, ack, node);
224 redeliveredMessages.remove(node.getMessageId());
225 node.decrementReferenceCount();
226 }
227
228
229 public synchronized String toString() {
230 return "DurableTopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
231 + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
232 }
233
234 public SubscriptionKey getSubscriptionKey() {
235 return subscriptionKey;
236 }
237
238 /**
239 * Release any references that we are holding.
240 */
241 public void destroy() {
242 synchronized (pending) {
243 try {
244
245 pending.reset();
246 while (pending.hasNext()) {
247 MessageReference node = pending.next();
248 node.decrementReferenceCount();
249 }
250
251 } finally {
252 pending.release();
253 pending.clear();
254 }
255 }
256 synchronized(dispatched) {
257 for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
258 MessageReference node = (MessageReference) iter.next();
259 node.decrementReferenceCount();
260 }
261 dispatched.clear();
262 }
263 }
264
265 /**
266 * @param usageManager
267 * @param oldPercentUsage
268 * @param newPercentUsage
269 * @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,
270 * int, int)
271 */
272 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
273 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
274 try {
275 dispatchPending();
276 } catch (IOException e) {
277 LOG.warn("problem calling dispatchMatched", e);
278 }
279 }
280 }
281
282 protected boolean isDropped(MessageReference node) {
283 return false;
284 }
285 }