001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.Collections;
021 import java.util.concurrent.ConcurrentHashMap;
022 import java.util.concurrent.atomic.AtomicBoolean;
023 import java.util.concurrent.atomic.AtomicLong;
024
025 import javax.jms.InvalidSelectorException;
026 import javax.jms.JMSException;
027
028 import org.apache.activemq.broker.Broker;
029 import org.apache.activemq.broker.ConnectionContext;
030 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
032 import org.apache.activemq.broker.region.policy.PolicyEntry;
033 import org.apache.activemq.command.ActiveMQDestination;
034 import org.apache.activemq.command.ConsumerInfo;
035 import org.apache.activemq.command.Message;
036 import org.apache.activemq.command.MessageAck;
037 import org.apache.activemq.command.MessageDispatch;
038 import org.apache.activemq.command.MessageId;
039 import org.apache.activemq.store.TopicMessageStore;
040 import org.apache.activemq.usage.SystemUsage;
041 import org.apache.activemq.usage.Usage;
042 import org.apache.activemq.usage.UsageListener;
043 import org.apache.activemq.util.SubscriptionKey;
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046
047 public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
048
049 private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
050 private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
051 private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
052 private final SubscriptionKey subscriptionKey;
053 private final boolean keepDurableSubsActive;
054 private AtomicBoolean active = new AtomicBoolean();
055 private AtomicLong offlineTimestamp = new AtomicLong(-1);
056
057 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
058 throws JMSException {
059 super(broker, usageManager, context, info);
060 this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
061 this.pending.setSystemUsage(usageManager);
062 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
063 this.keepDurableSubsActive = keepDurableSubsActive;
064 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
065 }
066
067 public final boolean isActive() {
068 return active.get();
069 }
070
071 public final long getOfflineTimestamp() {
072 return offlineTimestamp.get();
073 }
074
075 public boolean isFull() {
076 return !active.get() || super.isFull();
077 }
078
079 public void gc() {
080 }
081
082 /**
083 * store will have a pending ack for all durables, irrespective of the
084 * selector so we need to ack if node is un-matched
085 */
086 public void unmatched(MessageReference node) throws IOException {
087 MessageAck ack = new MessageAck();
088 ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
089 ack.setMessageID(node.getMessageId());
090 node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
091 }
092
093 @Override
094 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
095 // statically configured via maxPageSize
096 }
097
098 public void add(ConnectionContext context, Destination destination) throws Exception {
099 if (!destinations.contains(destination)) {
100 super.add(context, destination);
101 }
102 // do it just once per destination
103 if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
104 return;
105 }
106 durableDestinations.put(destination.getActiveMQDestination(), destination);
107
108 if (active.get() || keepDurableSubsActive) {
109 Topic topic = (Topic) destination;
110 topic.activate(context, this);
111 if (pending.isEmpty(topic)) {
112 topic.recoverRetroactiveMessages(context, this);
113 }
114 this.enqueueCounter += pending.size();
115 } else if (destination.getMessageStore() != null) {
116 TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
117 try {
118 this.enqueueCounter += store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName());
119 } catch (IOException e) {
120 JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e);
121 jmsEx.setLinkedException(e);
122 throw jmsEx;
123 }
124 }
125 dispatchPending();
126 }
127
128 public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception {
129 if (!active.get()) {
130 this.context = context;
131 this.info = info;
132
133 // On Activation we should update the configuration based on our new consumer info.
134 ActiveMQDestination dest = this.info.getDestination();
135 if (dest != null && regionBroker.getDestinationPolicy() != null) {
136 PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
137 if (entry != null) {
138 entry.configure(broker, usageManager, this);
139 }
140 }
141
142 LOG.debug("Activating " + this);
143 if (!keepDurableSubsActive) {
144 for (Destination destination : durableDestinations.values()) {
145 Topic topic = (Topic) destination;
146 add(context, topic);
147 topic.activate(context, this);
148 }
149 }
150 synchronized (pendingLock) {
151 pending.setSystemUsage(memoryManager);
152 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
153 pending.setMaxAuditDepth(getMaxAuditDepth());
154 pending.setMaxProducersToAudit(getMaxProducersToAudit());
155 pending.start();
156 // If nothing was in the persistent store, then try to use the
157 // recovery policy.
158 if (pending.isEmpty()) {
159 for (Destination destination : durableDestinations.values()) {
160 Topic topic = (Topic) destination;
161 topic.recoverRetroactiveMessages(context, this);
162 }
163 }
164 }
165 this.active.set(true);
166 this.offlineTimestamp.set(-1);
167 dispatchPending();
168 this.usageManager.getMemoryUsage().addUsageListener(this);
169 }
170 }
171
172 public void deactivate(boolean keepDurableSubsActive) throws Exception {
173 LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this);
174 active.set(false);
175 offlineTimestamp.set(System.currentTimeMillis());
176 this.usageManager.getMemoryUsage().removeUsageListener(this);
177 synchronized (pendingLock) {
178 pending.stop();
179
180 synchronized (dispatchLock) {
181 for (Destination destination : durableDestinations.values()) {
182 Topic topic = (Topic) destination;
183 if (!keepDurableSubsActive) {
184 topic.deactivate(context, this);
185 } else {
186 topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
187 }
188 }
189
190 // Before we add these back to pending they need to be in producer order not
191 // dispatch order so we can add them to the front of the pending list.
192 Collections.reverse(dispatched);
193
194 for (final MessageReference node : dispatched) {
195 // Mark the dispatched messages as redelivered for next time.
196 Integer count = redeliveredMessages.get(node.getMessageId());
197 if (count != null) {
198 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
199 } else {
200 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
201 }
202 if (keepDurableSubsActive && pending.isTransient()) {
203 pending.addMessageFirst(node);
204 pending.rollback(node.getMessageId());
205 } else {
206 node.decrementReferenceCount();
207 }
208 }
209
210 dispatched.clear();
211 }
212 if (!keepDurableSubsActive && pending.isTransient()) {
213 try {
214 pending.reset();
215 while (pending.hasNext()) {
216 MessageReference node = pending.next();
217 node.decrementReferenceCount();
218 pending.remove();
219 }
220 } finally {
221 pending.release();
222 }
223 }
224 }
225 prefetchExtension.set(0);
226 }
227
228 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
229 MessageDispatch md = super.createMessageDispatch(node, message);
230 if (node != QueueMessageReference.NULL_MESSAGE) {
231 Integer count = redeliveredMessages.get(node.getMessageId());
232 if (count != null) {
233 md.setRedeliveryCounter(count.intValue());
234 }
235 }
236 return md;
237 }
238
239 public void add(MessageReference node) throws Exception {
240 if (!active.get() && !keepDurableSubsActive) {
241 return;
242 }
243 super.add(node);
244 }
245
246 protected void dispatchPending() throws IOException {
247 if (isActive()) {
248 super.dispatchPending();
249 }
250 }
251
252 public void removePending(MessageReference node) throws IOException {
253 pending.remove(node);
254 }
255
256 protected void doAddRecoveredMessage(MessageReference message) throws Exception {
257 synchronized (pending) {
258 pending.addRecoveredMessage(message);
259 }
260 }
261
262 public int getPendingQueueSize() {
263 if (active.get() || keepDurableSubsActive) {
264 return super.getPendingQueueSize();
265 }
266 // TODO: need to get from store
267 return 0;
268 }
269
270 public void setSelector(String selector) throws InvalidSelectorException {
271 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
272 }
273
274 protected boolean canDispatch(MessageReference node) {
275 return isActive();
276 }
277
278 protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
279 node.getRegionDestination().acknowledge(context, this, ack, node);
280 redeliveredMessages.remove(node.getMessageId());
281 node.decrementReferenceCount();
282 }
283
284 public synchronized String toString() {
285 return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
286 + durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter
287 + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
288 }
289
290 public SubscriptionKey getSubscriptionKey() {
291 return subscriptionKey;
292 }
293
294 /**
295 * Release any references that we are holding.
296 */
297 public void destroy() {
298 synchronized (pendingLock) {
299 try {
300 pending.reset();
301 while (pending.hasNext()) {
302 MessageReference node = pending.next();
303 node.decrementReferenceCount();
304 }
305
306 } finally {
307 pending.release();
308 pending.clear();
309 }
310 }
311 synchronized (dispatchLock) {
312 for (MessageReference node : dispatched) {
313 node.decrementReferenceCount();
314 }
315 dispatched.clear();
316 }
317 setSlowConsumer(false);
318 }
319
320 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
321 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
322 try {
323 dispatchPending();
324 } catch (IOException e) {
325 LOG.warn("problem calling dispatchMatched", e);
326 }
327 }
328 }
329
330 protected boolean isDropped(MessageReference node) {
331 return false;
332 }
333
334 public boolean isKeepDurableSubsActive() {
335 return keepDurableSubsActive;
336 }
337 }