001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.cursors;
018
019 import java.util.Collections;
020 import java.util.HashMap;
021 import java.util.List;
022 import java.util.Map;
023 import java.util.concurrent.CopyOnWriteArrayList;
024 import org.apache.activemq.advisory.AdvisorySupport;
025 import org.apache.activemq.broker.Broker;
026 import org.apache.activemq.broker.ConnectionContext;
027 import org.apache.activemq.broker.region.Destination;
028 import org.apache.activemq.broker.region.DurableTopicSubscription;
029 import org.apache.activemq.broker.region.MessageReference;
030 import org.apache.activemq.broker.region.Topic;
031 import org.apache.activemq.command.Message;
032 import org.apache.activemq.usage.SystemUsage;
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035
036 /**
037 * persist pending messages pending message (messages awaiting dispatch to a
038 * consumer) cursor
039 *
040 *
041 */
042 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
043
044 private static final Logger LOG = LoggerFactory.getLogger(StoreDurableSubscriberCursor.class);
045 private final String clientId;
046 private final String subscriberName;
047 private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
048 private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
049 private final PendingMessageCursor nonPersistent;
050 private PendingMessageCursor currentCursor;
051 private final DurableTopicSubscription subscription;
052 private boolean immediatePriorityDispatch = true;
053 /**
054 * @param broker Broker for this cursor
055 * @param clientId clientId for this cursor
056 * @param subscriberName subscriber name for this cursor
057 * @param maxBatchSize currently ignored
058 * @param subscription subscription for this cursor
059 */
060 public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, DurableTopicSubscription subscription) {
061 super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
062 this.subscription=subscription;
063 this.clientId = clientId;
064 this.subscriberName = subscriberName;
065 if (broker.getBrokerService().isPersistent()) {
066 this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
067 } else {
068 this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
069 }
070
071 this.nonPersistent.setMaxBatchSize(maxBatchSize);
072 this.nonPersistent.setSystemUsage(systemUsage);
073 this.storePrefetches.add(this.nonPersistent);
074
075 if (prioritizedMessages) {
076 setMaxAuditDepth(10*getMaxAuditDepth());
077 }
078 }
079
080 @Override
081 public synchronized void start() throws Exception {
082 if (!isStarted()) {
083 super.start();
084 for (PendingMessageCursor tsp : storePrefetches) {
085 tsp.setMessageAudit(getMessageAudit());
086 tsp.start();
087 }
088 }
089 }
090
091 @Override
092 public synchronized void stop() throws Exception {
093 if (isStarted()) {
094 if (subscription.isKeepDurableSubsActive()) {
095 super.gc();
096 for (PendingMessageCursor tsp : storePrefetches) {
097 tsp.gc();
098 }
099 } else {
100 super.stop();
101 for (PendingMessageCursor tsp : storePrefetches) {
102 tsp.stop();
103 }
104 getMessageAudit().clear();
105 }
106 }
107 }
108
109 /**
110 * Add a destination
111 *
112 * @param context
113 * @param destination
114 * @throws Exception
115 */
116 @Override
117 public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
118 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
119 TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
120 tsp.setMaxBatchSize(destination.getMaxPageSize());
121 tsp.setSystemUsage(systemUsage);
122 tsp.setMessageAudit(getMessageAudit());
123 tsp.setEnableAudit(isEnableAudit());
124 tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
125 tsp.setUseCache(isUseCache());
126 tsp.setCacheEnabled(isUseCache() && tsp.isEmpty());
127 topics.put(destination, tsp);
128 storePrefetches.add(tsp);
129 if (isStarted()) {
130 tsp.start();
131 }
132 }
133 }
134
135 /**
136 * remove a destination
137 *
138 * @param context
139 * @param destination
140 * @throws Exception
141 */
142 @Override
143 public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
144 PendingMessageCursor tsp = topics.remove(destination);
145 if (tsp != null) {
146 storePrefetches.remove(tsp);
147 }
148 return Collections.EMPTY_LIST;
149 }
150
151 /**
152 * @return true if there are no pending messages
153 */
154 @Override
155 public synchronized boolean isEmpty() {
156 for (PendingMessageCursor tsp : storePrefetches) {
157 if( !tsp.isEmpty() )
158 return false;
159 }
160 return true;
161 }
162
163 @Override
164 public synchronized boolean isEmpty(Destination destination) {
165 boolean result = true;
166 TopicStorePrefetch tsp = topics.get(destination);
167 if (tsp != null) {
168 result = tsp.isEmpty();
169 }
170 return result;
171 }
172
173 /**
174 * Informs the Broker if the subscription needs to intervention to recover
175 * it's state e.g. DurableTopicSubscriber may do
176 *
177 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
178 * @return true if recovery required
179 */
180 @Override
181 public boolean isRecoveryRequired() {
182 return false;
183 }
184
185 @Override
186 public synchronized void addMessageLast(MessageReference node) throws Exception {
187 if (node != null) {
188 Message msg = node.getMessage();
189 if (isStarted()) {
190 if (!msg.isPersistent()) {
191 nonPersistent.addMessageLast(node);
192 }
193 }
194 if (msg.isPersistent()) {
195 Destination dest = msg.getRegionDestination();
196 TopicStorePrefetch tsp = topics.get(dest);
197 if (tsp != null) {
198 tsp.addMessageLast(node);
199 if (prioritizedMessages && immediatePriorityDispatch && tsp.isPaging()) {
200 if (msg.getPriority() > tsp.getLastRecoveredPriority()) {
201 tsp.recoverMessage(node.getMessage(), true);
202 if (LOG.isTraceEnabled()) {
203 LOG.trace("cached high priority (" + msg.getPriority() + ") message:" +
204 msg.getMessageId() + ", current paged batch priority: " +
205 tsp.getLastRecoveredPriority() + ", cache size:" + tsp.batchList.size());
206 }
207 }
208 }
209 }
210 }
211
212 }
213 }
214
215 @Override
216 public boolean isTransient() {
217 return subscription.isKeepDurableSubsActive();
218 }
219
220 @Override
221 public void addMessageFirst(MessageReference node) throws Exception {
222 // for keep durable subs active, need to deal with redispatch
223 if (node != null) {
224 Message msg = node.getMessage();
225 if (!msg.isPersistent()) {
226 nonPersistent.addMessageFirst(node);
227 } else {
228 Destination dest = msg.getRegionDestination();
229 TopicStorePrefetch tsp = topics.get(dest);
230 if (tsp != null) {
231 tsp.addMessageFirst(node);
232 }
233 }
234 }
235 }
236
237 @Override
238 public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
239 nonPersistent.addMessageLast(node);
240 }
241
242 @Override
243 public synchronized void clear() {
244 for (PendingMessageCursor tsp : storePrefetches) {
245 tsp.clear();
246 }
247 }
248
249 @Override
250 public synchronized boolean hasNext() {
251 boolean result = true;
252 if (result) {
253 try {
254 currentCursor = getNextCursor();
255 } catch (Exception e) {
256 LOG.error("Failed to get current cursor ", e);
257 throw new RuntimeException(e);
258 }
259 result = currentCursor != null ? currentCursor.hasNext() : false;
260 }
261 return result;
262 }
263
264 @Override
265 public synchronized MessageReference next() {
266 MessageReference result = currentCursor != null ? currentCursor.next() : null;
267 return result;
268 }
269
270 @Override
271 public synchronized void remove() {
272 if (currentCursor != null) {
273 currentCursor.remove();
274 }
275 }
276
277 @Override
278 public synchronized void remove(MessageReference node) {
279 for (PendingMessageCursor tsp : storePrefetches) {
280 tsp.remove(node);
281 }
282 }
283
284 @Override
285 public synchronized void reset() {
286 for (PendingMessageCursor storePrefetch : storePrefetches) {
287 storePrefetch.reset();
288 }
289 }
290
291 @Override
292 public synchronized void release() {
293 for (PendingMessageCursor storePrefetch : storePrefetches) {
294 storePrefetch.release();
295 }
296 }
297
298 @Override
299 public synchronized int size() {
300 int pendingCount=0;
301 for (PendingMessageCursor tsp : storePrefetches) {
302 pendingCount += tsp.size();
303 }
304 return pendingCount;
305 }
306
307 @Override
308 public void setMaxBatchSize(int newMaxBatchSize) {
309 for (PendingMessageCursor storePrefetch : storePrefetches) {
310 storePrefetch.setMaxBatchSize(newMaxBatchSize);
311 }
312 super.setMaxBatchSize(newMaxBatchSize);
313 }
314
315 @Override
316 public synchronized void gc() {
317 for (PendingMessageCursor tsp : storePrefetches) {
318 tsp.gc();
319 }
320 }
321
322 @Override
323 public void setSystemUsage(SystemUsage usageManager) {
324 super.setSystemUsage(usageManager);
325 for (PendingMessageCursor tsp : storePrefetches) {
326 tsp.setSystemUsage(usageManager);
327 }
328 }
329
330 @Override
331 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
332 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
333 for (PendingMessageCursor cursor : storePrefetches) {
334 cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
335 }
336 }
337
338 @Override
339 public void setMaxProducersToAudit(int maxProducersToAudit) {
340 super.setMaxProducersToAudit(maxProducersToAudit);
341 for (PendingMessageCursor cursor : storePrefetches) {
342 cursor.setMaxAuditDepth(maxAuditDepth);
343 }
344 }
345
346 @Override
347 public void setMaxAuditDepth(int maxAuditDepth) {
348 super.setMaxAuditDepth(maxAuditDepth);
349 for (PendingMessageCursor cursor : storePrefetches) {
350 cursor.setMaxAuditDepth(maxAuditDepth);
351 }
352 }
353
354 @Override
355 public void setEnableAudit(boolean enableAudit) {
356 super.setEnableAudit(enableAudit);
357 for (PendingMessageCursor cursor : storePrefetches) {
358 cursor.setEnableAudit(enableAudit);
359 }
360 }
361
362 @Override
363 public void setUseCache(boolean useCache) {
364 super.setUseCache(useCache);
365 for (PendingMessageCursor cursor : storePrefetches) {
366 cursor.setUseCache(useCache);
367 }
368 }
369
370 protected synchronized PendingMessageCursor getNextCursor() throws Exception {
371 if (currentCursor == null || currentCursor.isEmpty()) {
372 currentCursor = null;
373 for (PendingMessageCursor tsp : storePrefetches) {
374 if (tsp.hasNext()) {
375 currentCursor = tsp;
376 break;
377 }
378 }
379 // round-robin
380 if (storePrefetches.size()>1) {
381 PendingMessageCursor first = storePrefetches.remove(0);
382 storePrefetches.add(first);
383 }
384 }
385 return currentCursor;
386 }
387
388 @Override
389 public String toString() {
390 return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
391 }
392
393 public boolean isImmediatePriorityDispatch() {
394 return immediatePriorityDispatch;
395 }
396
397 public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
398 this.immediatePriorityDispatch = immediatePriorityDispatch;
399 }
400
401 }