001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.DataInputStream;
020 import java.io.IOException;
021 import java.io.InterruptedIOException;
022 import java.util.ArrayList;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.Iterator;
026 import java.util.LinkedList;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.Map.Entry;
031 import java.util.concurrent.*;
032 import java.util.concurrent.atomic.AtomicBoolean;
033 import java.util.concurrent.atomic.AtomicInteger;
034 import org.apache.activemq.broker.ConnectionContext;
035 import org.apache.activemq.broker.region.Destination;
036 import org.apache.activemq.broker.region.RegionBroker;
037 import org.apache.activemq.command.ActiveMQDestination;
038 import org.apache.activemq.command.ActiveMQQueue;
039 import org.apache.activemq.command.ActiveMQTempQueue;
040 import org.apache.activemq.command.ActiveMQTempTopic;
041 import org.apache.activemq.command.ActiveMQTopic;
042 import org.apache.activemq.command.Message;
043 import org.apache.activemq.command.MessageAck;
044 import org.apache.activemq.command.MessageId;
045 import org.apache.activemq.command.ProducerId;
046 import org.apache.activemq.command.SubscriptionInfo;
047 import org.apache.activemq.command.TransactionId;
048 import org.apache.activemq.openwire.OpenWireFormat;
049 import org.apache.activemq.protobuf.Buffer;
050 import org.apache.activemq.store.AbstractMessageStore;
051 import org.apache.activemq.store.MessageRecoveryListener;
052 import org.apache.activemq.store.MessageStore;
053 import org.apache.activemq.store.PersistenceAdapter;
054 import org.apache.activemq.store.TopicMessageStore;
055 import org.apache.activemq.store.TransactionStore;
056 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
057 import org.apache.activemq.store.kahadb.data.KahaDestination;
058 import org.apache.activemq.store.kahadb.data.KahaLocation;
059 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
060 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
061 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
062 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
063 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
064 import org.apache.activemq.usage.MemoryUsage;
065 import org.apache.activemq.usage.SystemUsage;
066 import org.apache.activemq.util.ServiceStopper;
067 import org.apache.activemq.util.ThreadPoolUtils;
068 import org.apache.activemq.wireformat.WireFormat;
069 import org.slf4j.Logger;
070 import org.slf4j.LoggerFactory;
071 import org.apache.kahadb.journal.Location;
072 import org.apache.kahadb.page.Transaction;
073
074 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
075 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
076 private static final int MAX_ASYNC_JOBS = 10000;
077
078 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
079 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
080 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
081 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
082 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
083 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
084
085 protected ExecutorService queueExecutor;
086 protected ExecutorService topicExecutor;
087 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
088 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
089 final WireFormat wireFormat = new OpenWireFormat();
090 private SystemUsage usageManager;
091 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
092 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
093 Semaphore globalQueueSemaphore;
094 Semaphore globalTopicSemaphore;
095 private boolean concurrentStoreAndDispatchQueues = true;
096 // when true, message order may be compromised when cache is exhausted if store is out
097 // or order w.r.t cache
098 private boolean concurrentStoreAndDispatchTopics = false;
099 private boolean concurrentStoreAndDispatchTransactions = false;
100 private int maxAsyncJobs = MAX_ASYNC_JOBS;
101 private final KahaDBTransactionStore transactionStore;
102 private TransactionIdTransformer transactionIdTransformer;
103
104 public KahaDBStore() {
105 this.transactionStore = new KahaDBTransactionStore(this);
106 this.transactionIdTransformer = new TransactionIdTransformer() {
107 @Override
108 public KahaTransactionInfo transform(TransactionId txid) {
109 return TransactionIdConversion.convert(txid);
110 }
111 };
112 }
113
114 @Override
115 public String toString() {
116 return "KahaDB:[" + directory.getAbsolutePath() + "]";
117 }
118
119 public void setBrokerName(String brokerName) {
120 }
121
122 public void setUsageManager(SystemUsage usageManager) {
123 this.usageManager = usageManager;
124 }
125
126 public SystemUsage getUsageManager() {
127 return this.usageManager;
128 }
129
130 /**
131 * @return the concurrentStoreAndDispatch
132 */
133 public boolean isConcurrentStoreAndDispatchQueues() {
134 return this.concurrentStoreAndDispatchQueues;
135 }
136
137 /**
138 * @param concurrentStoreAndDispatch
139 * the concurrentStoreAndDispatch to set
140 */
141 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
142 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
143 }
144
145 /**
146 * @return the concurrentStoreAndDispatch
147 */
148 public boolean isConcurrentStoreAndDispatchTopics() {
149 return this.concurrentStoreAndDispatchTopics;
150 }
151
152 /**
153 * @param concurrentStoreAndDispatch
154 * the concurrentStoreAndDispatch to set
155 */
156 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
157 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
158 }
159
160 public boolean isConcurrentStoreAndDispatchTransactions() {
161 return this.concurrentStoreAndDispatchTransactions;
162 }
163
164 /**
165 * @return the maxAsyncJobs
166 */
167 public int getMaxAsyncJobs() {
168 return this.maxAsyncJobs;
169 }
170 /**
171 * @param maxAsyncJobs
172 * the maxAsyncJobs to set
173 */
174 public void setMaxAsyncJobs(int maxAsyncJobs) {
175 this.maxAsyncJobs = maxAsyncJobs;
176 }
177
178 @Override
179 public void doStart() throws Exception {
180 super.doStart();
181 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
182 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
183 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
184 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
185 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
186 asyncQueueJobQueue, new ThreadFactory() {
187 public Thread newThread(Runnable runnable) {
188 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
189 thread.setDaemon(true);
190 return thread;
191 }
192 });
193 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
194 asyncTopicJobQueue, new ThreadFactory() {
195 public Thread newThread(Runnable runnable) {
196 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
197 thread.setDaemon(true);
198 return thread;
199 }
200 });
201 }
202
203 @Override
204 public void doStop(ServiceStopper stopper) throws Exception {
205 // drain down async jobs
206 LOG.info("Stopping async queue tasks");
207 if (this.globalQueueSemaphore != null) {
208 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
209 }
210 synchronized (this.asyncQueueMaps) {
211 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
212 synchronized (m) {
213 for (StoreTask task : m.values()) {
214 task.cancel();
215 }
216 }
217 }
218 this.asyncQueueMaps.clear();
219 }
220 LOG.info("Stopping async topic tasks");
221 if (this.globalTopicSemaphore != null) {
222 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
223 }
224 synchronized (this.asyncTopicMaps) {
225 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
226 synchronized (m) {
227 for (StoreTask task : m.values()) {
228 task.cancel();
229 }
230 }
231 }
232 this.asyncTopicMaps.clear();
233 }
234 if (this.globalQueueSemaphore != null) {
235 this.globalQueueSemaphore.drainPermits();
236 }
237 if (this.globalTopicSemaphore != null) {
238 this.globalTopicSemaphore.drainPermits();
239 }
240 if (this.queueExecutor != null) {
241 ThreadPoolUtils.shutdownNow(queueExecutor);
242 queueExecutor = null;
243 }
244 if (this.topicExecutor != null) {
245 ThreadPoolUtils.shutdownNow(topicExecutor);
246 topicExecutor = null;
247 }
248 LOG.info("Stopped KahaDB");
249 super.doStop(stopper);
250 }
251
252 void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException {
253 Location location;
254 this.indexLock.writeLock().lock();
255 try {
256 location = findMessageLocation(key, destination);
257 } finally {
258 this.indexLock.writeLock().unlock();
259 }
260
261 if (location != null) {
262 KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
263 Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
264
265 message.incrementRedeliveryCounter();
266 if (LOG.isTraceEnabled()) {
267 LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter());
268 }
269 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
270 addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
271
272 final Location rewriteLocation = journal.write(toByteSequence(addMessage), true);
273
274 this.indexLock.writeLock().lock();
275 try {
276 pageFile.tx().execute(new Transaction.Closure<IOException>() {
277 public void execute(Transaction tx) throws IOException {
278 StoredDestination sd = getStoredDestination(destination, tx);
279 Long sequence = sd.messageIdIndex.get(tx, key);
280 MessageKeys keys = sd.orderIndex.get(tx, sequence);
281 sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation));
282 }
283 });
284 } finally {
285 this.indexLock.writeLock().unlock();
286 }
287 }
288 }
289
290 @Override
291 void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
292 if (brokerService != null) {
293 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
294 if (regionBroker != null) {
295 Set<Destination> destinationSet = regionBroker.getDestinations(convert(commandDestination));
296 for (Destination destination : destinationSet) {
297 destination.getDestinationStatistics().getMessages().decrement();
298 destination.getDestinationStatistics().getEnqueues().decrement();
299 }
300 }
301 }
302 }
303
304 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
305 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
306 public Location execute(Transaction tx) throws IOException {
307 StoredDestination sd = getStoredDestination(destination, tx);
308 Long sequence = sd.messageIdIndex.get(tx, key);
309 if (sequence == null) {
310 return null;
311 }
312 return sd.orderIndex.get(tx, sequence).location;
313 }
314 });
315 }
316
317 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
318 StoreQueueTask task = null;
319 synchronized (store.asyncTaskMap) {
320 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
321 }
322 return task;
323 }
324
325 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
326 synchronized (store.asyncTaskMap) {
327 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
328 }
329 this.queueExecutor.execute(task);
330 }
331
332 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
333 StoreTopicTask task = null;
334 synchronized (store.asyncTaskMap) {
335 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
336 }
337 return task;
338 }
339
340 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
341 synchronized (store.asyncTaskMap) {
342 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
343 }
344 this.topicExecutor.execute(task);
345 }
346
347 public TransactionStore createTransactionStore() throws IOException {
348 return this.transactionStore;
349 }
350
351 public boolean getForceRecoverIndex() {
352 return this.forceRecoverIndex;
353 }
354
355 public void setForceRecoverIndex(boolean forceRecoverIndex) {
356 this.forceRecoverIndex = forceRecoverIndex;
357 }
358
359 public class KahaDBMessageStore extends AbstractMessageStore {
360 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
361 protected KahaDestination dest;
362 private final int maxAsyncJobs;
363 private final Semaphore localDestinationSemaphore;
364
365 double doneTasks, canceledTasks = 0;
366
367 public KahaDBMessageStore(ActiveMQDestination destination) {
368 super(destination);
369 this.dest = convert(destination);
370 this.maxAsyncJobs = getMaxAsyncJobs();
371 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
372 }
373
374 @Override
375 public ActiveMQDestination getDestination() {
376 return destination;
377 }
378
379 @Override
380 public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
381 throws IOException {
382 if (isConcurrentStoreAndDispatchQueues()) {
383 StoreQueueTask result = new StoreQueueTask(this, context, message);
384 result.aquireLocks();
385 addQueueTask(this, result);
386 return result.getFuture();
387 } else {
388 return super.asyncAddQueueMessage(context, message);
389 }
390 }
391
392 @Override
393 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
394 if (isConcurrentStoreAndDispatchQueues()) {
395 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
396 StoreQueueTask task = null;
397 synchronized (asyncTaskMap) {
398 task = (StoreQueueTask) asyncTaskMap.get(key);
399 }
400 if (task != null) {
401 if (!task.cancel()) {
402 try {
403
404 task.future.get();
405 } catch (InterruptedException e) {
406 throw new InterruptedIOException(e.toString());
407 } catch (Exception ignored) {
408 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
409 }
410 removeMessage(context, ack);
411 } else {
412 synchronized (asyncTaskMap) {
413 asyncTaskMap.remove(key);
414 }
415 }
416 } else {
417 removeMessage(context, ack);
418 }
419 } else {
420 removeMessage(context, ack);
421 }
422 }
423
424 public void addMessage(ConnectionContext context, Message message) throws IOException {
425 KahaAddMessageCommand command = new KahaAddMessageCommand();
426 command.setDestination(dest);
427 command.setMessageId(message.getMessageId().toString());
428 command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
429 command.setPriority(message.getPriority());
430 command.setPrioritySupported(isPrioritizedMessages());
431 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
432 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
433 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
434
435 }
436
437 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
438 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
439 command.setDestination(dest);
440 command.setMessageId(ack.getLastMessageId().toString());
441 command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
442
443 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
444 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
445 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
446 }
447
448 public void removeAllMessages(ConnectionContext context) throws IOException {
449 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
450 command.setDestination(dest);
451 store(command, true, null, null);
452 }
453
454 public Message getMessage(MessageId identity) throws IOException {
455 final String key = identity.toString();
456
457 // Hopefully one day the page file supports concurrent read
458 // operations... but for now we must
459 // externally synchronize...
460 Location location;
461 indexLock.writeLock().lock();
462 try {
463 location = findMessageLocation(key, dest);
464 }finally {
465 indexLock.writeLock().unlock();
466 }
467 if (location == null) {
468 return null;
469 }
470
471 return loadMessage(location);
472 }
473
474 public int getMessageCount() throws IOException {
475 try {
476 lockAsyncJobQueue();
477 indexLock.writeLock().lock();
478 try {
479 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
480 public Integer execute(Transaction tx) throws IOException {
481 // Iterate through all index entries to get a count
482 // of
483 // messages in the destination.
484 StoredDestination sd = getStoredDestination(dest, tx);
485 int rc = 0;
486 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
487 .hasNext();) {
488 iterator.next();
489 rc++;
490 }
491 return rc;
492 }
493 });
494 }finally {
495 indexLock.writeLock().unlock();
496 }
497 } finally {
498 unlockAsyncJobQueue();
499 }
500 }
501
502 @Override
503 public boolean isEmpty() throws IOException {
504 indexLock.writeLock().lock();
505 try {
506 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
507 public Boolean execute(Transaction tx) throws IOException {
508 // Iterate through all index entries to get a count of
509 // messages in the destination.
510 StoredDestination sd = getStoredDestination(dest, tx);
511 return sd.locationIndex.isEmpty(tx);
512 }
513 });
514 }finally {
515 indexLock.writeLock().unlock();
516 }
517 }
518
519 public void recover(final MessageRecoveryListener listener) throws Exception {
520 // recovery may involve expiry which will modify
521 indexLock.writeLock().lock();
522 try {
523 pageFile.tx().execute(new Transaction.Closure<Exception>() {
524 public void execute(Transaction tx) throws Exception {
525 StoredDestination sd = getStoredDestination(dest, tx);
526 sd.orderIndex.resetCursorPosition();
527 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
528 .hasNext(); ) {
529 Entry<Long, MessageKeys> entry = iterator.next();
530 if (ackedAndPrepared.contains(entry.getValue().messageId)) {
531 continue;
532 }
533 Message msg = loadMessage(entry.getValue().location);
534 listener.recoverMessage(msg);
535 }
536 }
537 });
538 }finally {
539 indexLock.writeLock().unlock();
540 }
541 }
542
543
544 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
545 indexLock.writeLock().lock();
546 try {
547 pageFile.tx().execute(new Transaction.Closure<Exception>() {
548 public void execute(Transaction tx) throws Exception {
549 StoredDestination sd = getStoredDestination(dest, tx);
550 Entry<Long, MessageKeys> entry = null;
551 int counter = 0;
552 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
553 listener.hasSpace() && iterator.hasNext(); ) {
554 entry = iterator.next();
555 if (ackedAndPrepared.contains(entry.getValue().messageId)) {
556 continue;
557 }
558 Message msg = loadMessage(entry.getValue().location);
559 listener.recoverMessage(msg);
560 counter++;
561 if (counter >= maxReturned) {
562 break;
563 }
564 }
565 sd.orderIndex.stoppedIterating();
566 }
567 });
568 }finally {
569 indexLock.writeLock().unlock();
570 }
571 }
572
573 public void resetBatching() {
574 if (pageFile.isLoaded()) {
575 indexLock.writeLock().lock();
576 try {
577 pageFile.tx().execute(new Transaction.Closure<Exception>() {
578 public void execute(Transaction tx) throws Exception {
579 StoredDestination sd = getExistingStoredDestination(dest, tx);
580 if (sd != null) {
581 sd.orderIndex.resetCursorPosition();}
582 }
583 });
584 } catch (Exception e) {
585 LOG.error("Failed to reset batching",e);
586 }finally {
587 indexLock.writeLock().unlock();
588 }
589 }
590 }
591
592 @Override
593 public void setBatch(MessageId identity) throws IOException {
594 try {
595 final String key = identity.toString();
596 lockAsyncJobQueue();
597
598 // Hopefully one day the page file supports concurrent read
599 // operations... but for now we must
600 // externally synchronize...
601
602 indexLock.writeLock().lock();
603 try {
604 pageFile.tx().execute(new Transaction.Closure<IOException>() {
605 public void execute(Transaction tx) throws IOException {
606 StoredDestination sd = getStoredDestination(dest, tx);
607 Long location = sd.messageIdIndex.get(tx, key);
608 if (location != null) {
609 sd.orderIndex.setBatch(tx, location);
610 }
611 }
612 });
613 } finally {
614 indexLock.writeLock().unlock();
615 }
616 } finally {
617 unlockAsyncJobQueue();
618 }
619 }
620
621 @Override
622 public void setMemoryUsage(MemoryUsage memoeyUSage) {
623 }
624 @Override
625 public void start() throws Exception {
626 super.start();
627 }
628 @Override
629 public void stop() throws Exception {
630 super.stop();
631 }
632
633 protected void lockAsyncJobQueue() {
634 try {
635 this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
636 } catch (Exception e) {
637 LOG.error("Failed to lock async jobs for " + this.destination, e);
638 }
639 }
640
641 protected void unlockAsyncJobQueue() {
642 this.localDestinationSemaphore.release(this.maxAsyncJobs);
643 }
644
645 protected void acquireLocalAsyncLock() {
646 try {
647 this.localDestinationSemaphore.acquire();
648 } catch (InterruptedException e) {
649 LOG.error("Failed to aquire async lock for " + this.destination, e);
650 }
651 }
652
653 protected void releaseLocalAsyncLock() {
654 this.localDestinationSemaphore.release();
655 }
656
657 }
658
659 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
660 private final AtomicInteger subscriptionCount = new AtomicInteger();
661 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
662 super(destination);
663 this.subscriptionCount.set(getAllSubscriptions().length);
664 asyncTopicMaps.add(asyncTaskMap);
665 }
666
667 @Override
668 public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
669 throws IOException {
670 if (isConcurrentStoreAndDispatchTopics()) {
671 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
672 result.aquireLocks();
673 addTopicTask(this, result);
674 return result.getFuture();
675 } else {
676 return super.asyncAddTopicMessage(context, message);
677 }
678 }
679
680 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
681 MessageId messageId, MessageAck ack)
682 throws IOException {
683 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
684 if (isConcurrentStoreAndDispatchTopics()) {
685 AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
686 StoreTopicTask task = null;
687 synchronized (asyncTaskMap) {
688 task = (StoreTopicTask) asyncTaskMap.get(key);
689 }
690 if (task != null) {
691 if (task.addSubscriptionKey(subscriptionKey)) {
692 removeTopicTask(this, messageId);
693 if (task.cancel()) {
694 synchronized (asyncTaskMap) {
695 asyncTaskMap.remove(key);
696 }
697 }
698 }
699 } else {
700 doAcknowledge(context, subscriptionKey, messageId, ack);
701 }
702 } else {
703 doAcknowledge(context, subscriptionKey, messageId, ack);
704 }
705 }
706
707 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
708 throws IOException {
709 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
710 command.setDestination(dest);
711 command.setSubscriptionKey(subscriptionKey);
712 command.setMessageId(messageId.toString());
713 command.setTransactionInfo(ack != null ? transactionIdTransformer.transform(ack.getTransactionId()) : null);
714 if (ack != null && ack.isUnmatchedAck()) {
715 command.setAck(UNMATCHED);
716 } else {
717 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
718 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
719 }
720 store(command, false, null, null);
721 }
722
723 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
724 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
725 .getSubscriptionName());
726 KahaSubscriptionCommand command = new KahaSubscriptionCommand();
727 command.setDestination(dest);
728 command.setSubscriptionKey(subscriptionKey.toString());
729 command.setRetroactive(retroactive);
730 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
731 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
732 store(command, isEnableJournalDiskSyncs() && true, null, null);
733 this.subscriptionCount.incrementAndGet();
734 }
735
736 public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
737 KahaSubscriptionCommand command = new KahaSubscriptionCommand();
738 command.setDestination(dest);
739 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
740 store(command, isEnableJournalDiskSyncs() && true, null, null);
741 this.subscriptionCount.decrementAndGet();
742 }
743
744 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
745
746 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
747 indexLock.writeLock().lock();
748 try {
749 pageFile.tx().execute(new Transaction.Closure<IOException>() {
750 public void execute(Transaction tx) throws IOException {
751 StoredDestination sd = getStoredDestination(dest, tx);
752 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
753 .hasNext();) {
754 Entry<String, KahaSubscriptionCommand> entry = iterator.next();
755 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
756 .getValue().getSubscriptionInfo().newInput()));
757 subscriptions.add(info);
758
759 }
760 }
761 });
762 }finally {
763 indexLock.writeLock().unlock();
764 }
765
766 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
767 subscriptions.toArray(rc);
768 return rc;
769 }
770
771 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
772 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
773 indexLock.writeLock().lock();
774 try {
775 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
776 public SubscriptionInfo execute(Transaction tx) throws IOException {
777 StoredDestination sd = getStoredDestination(dest, tx);
778 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
779 if (command == null) {
780 return null;
781 }
782 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
783 .getSubscriptionInfo().newInput()));
784 }
785 });
786 }finally {
787 indexLock.writeLock().unlock();
788 }
789 }
790
791 public int getMessageCount(String clientId, String subscriptionName) throws IOException {
792 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
793 indexLock.writeLock().lock();
794 try {
795 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
796 public Integer execute(Transaction tx) throws IOException {
797 StoredDestination sd = getStoredDestination(dest, tx);
798 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
799 if (cursorPos == null) {
800 // The subscription might not exist.
801 return 0;
802 }
803
804 return (int) getStoredMessageCount(tx, sd, subscriptionKey);
805 }
806 });
807 }finally {
808 indexLock.writeLock().unlock();
809 }
810 }
811
812 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
813 throws Exception {
814 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
815 @SuppressWarnings("unused")
816 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
817 indexLock.writeLock().lock();
818 try {
819 pageFile.tx().execute(new Transaction.Closure<Exception>() {
820 public void execute(Transaction tx) throws Exception {
821 StoredDestination sd = getStoredDestination(dest, tx);
822 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
823 sd.orderIndex.setBatch(tx, cursorPos);
824 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
825 .hasNext();) {
826 Entry<Long, MessageKeys> entry = iterator.next();
827 if (ackedAndPrepared.contains(entry.getValue().messageId)) {
828 continue;
829 }
830 listener.recoverMessage(loadMessage(entry.getValue().location));
831 }
832 sd.orderIndex.resetCursorPosition();
833 }
834 });
835 }finally {
836 indexLock.writeLock().unlock();
837 }
838 }
839
840 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
841 final MessageRecoveryListener listener) throws Exception {
842 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
843 @SuppressWarnings("unused")
844 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
845 indexLock.writeLock().lock();
846 try {
847 pageFile.tx().execute(new Transaction.Closure<Exception>() {
848 public void execute(Transaction tx) throws Exception {
849 StoredDestination sd = getStoredDestination(dest, tx);
850 sd.orderIndex.resetCursorPosition();
851 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
852 if (moc == null) {
853 LastAck pos = getLastAck(tx, sd, subscriptionKey);
854 if (pos == null) {
855 // sub deleted
856 return;
857 }
858 sd.orderIndex.setBatch(tx, pos);
859 moc = sd.orderIndex.cursor;
860 } else {
861 sd.orderIndex.cursor.sync(moc);
862 }
863
864 Entry<Long, MessageKeys> entry = null;
865 int counter = 0;
866 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
867 .hasNext();) {
868 entry = iterator.next();
869 if (ackedAndPrepared.contains(entry.getValue().messageId)) {
870 continue;
871 }
872 if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
873 counter++;
874 }
875 if (counter >= maxReturned || listener.hasSpace() == false) {
876 break;
877 }
878 }
879 sd.orderIndex.stoppedIterating();
880 if (entry != null) {
881 MessageOrderCursor copy = sd.orderIndex.cursor.copy();
882 sd.subscriptionCursors.put(subscriptionKey, copy);
883 }
884 }
885 });
886 }finally {
887 indexLock.writeLock().unlock();
888 }
889 }
890
891 public void resetBatching(String clientId, String subscriptionName) {
892 try {
893 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
894 indexLock.writeLock().lock();
895 try {
896 pageFile.tx().execute(new Transaction.Closure<IOException>() {
897 public void execute(Transaction tx) throws IOException {
898 StoredDestination sd = getStoredDestination(dest, tx);
899 sd.subscriptionCursors.remove(subscriptionKey);
900 }
901 });
902 }finally {
903 indexLock.writeLock().unlock();
904 }
905 } catch (IOException e) {
906 throw new RuntimeException(e);
907 }
908 }
909 }
910
911 String subscriptionKey(String clientId, String subscriptionName) {
912 return clientId + ":" + subscriptionName;
913 }
914
915 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
916 return this.transactionStore.proxy(new KahaDBMessageStore(destination));
917 }
918
919 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
920 return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
921 }
922
923 /**
924 * Cleanup method to remove any state associated with the given destination.
925 * This method does not stop the message store (it might not be cached).
926 *
927 * @param destination
928 * Destination to forget
929 */
930 public void removeQueueMessageStore(ActiveMQQueue destination) {
931 }
932
933 /**
934 * Cleanup method to remove any state associated with the given destination
935 * This method does not stop the message store (it might not be cached).
936 *
937 * @param destination
938 * Destination to forget
939 */
940 public void removeTopicMessageStore(ActiveMQTopic destination) {
941 }
942
943 public void deleteAllMessages() throws IOException {
944 deleteAllMessages = true;
945 }
946
947 public Set<ActiveMQDestination> getDestinations() {
948 try {
949 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
950 indexLock.writeLock().lock();
951 try {
952 pageFile.tx().execute(new Transaction.Closure<IOException>() {
953 public void execute(Transaction tx) throws IOException {
954 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
955 .hasNext();) {
956 Entry<String, StoredDestination> entry = iterator.next();
957 if (!isEmptyTopic(entry, tx)) {
958 rc.add(convert(entry.getKey()));
959 }
960 }
961 }
962
963 private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
964 throws IOException {
965 boolean isEmptyTopic = false;
966 ActiveMQDestination dest = convert(entry.getKey());
967 if (dest.isTopic()) {
968 StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
969 if (loadedStore.subscriptionAcks.isEmpty(tx)) {
970 isEmptyTopic = true;
971 }
972 }
973 return isEmptyTopic;
974 }
975 });
976 }finally {
977 indexLock.writeLock().unlock();
978 }
979 return rc;
980 } catch (IOException e) {
981 throw new RuntimeException(e);
982 }
983 }
984
985 public long getLastMessageBrokerSequenceId() throws IOException {
986 return 0;
987 }
988
989 public long getLastProducerSequenceId(ProducerId id) {
990 indexLock.readLock().lock();
991 try {
992 return metadata.producerSequenceIdTracker.getLastSeqId(id);
993 } finally {
994 indexLock.readLock().unlock();
995 }
996 }
997
998 public long size() {
999 try {
1000 return journalSize.get() + getPageFile().getDiskSize();
1001 } catch (IOException e) {
1002 throw new RuntimeException(e);
1003 }
1004 }
1005
1006 public void beginTransaction(ConnectionContext context) throws IOException {
1007 throw new IOException("Not yet implemented.");
1008 }
1009 public void commitTransaction(ConnectionContext context) throws IOException {
1010 throw new IOException("Not yet implemented.");
1011 }
1012 public void rollbackTransaction(ConnectionContext context) throws IOException {
1013 throw new IOException("Not yet implemented.");
1014 }
1015
1016 public void checkpoint(boolean sync) throws IOException {
1017 super.checkpointCleanup(sync);
1018 }
1019
1020 // /////////////////////////////////////////////////////////////////
1021 // Internal helper methods.
1022 // /////////////////////////////////////////////////////////////////
1023
1024 /**
1025 * @param location
1026 * @return
1027 * @throws IOException
1028 */
1029 Message loadMessage(Location location) throws IOException {
1030 KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
1031 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1032 return msg;
1033 }
1034
1035 // /////////////////////////////////////////////////////////////////
1036 // Internal conversion methods.
1037 // /////////////////////////////////////////////////////////////////
1038
1039 KahaLocation convert(Location location) {
1040 KahaLocation rc = new KahaLocation();
1041 rc.setLogId(location.getDataFileId());
1042 rc.setOffset(location.getOffset());
1043 return rc;
1044 }
1045
1046 KahaDestination convert(ActiveMQDestination dest) {
1047 KahaDestination rc = new KahaDestination();
1048 rc.setName(dest.getPhysicalName());
1049 switch (dest.getDestinationType()) {
1050 case ActiveMQDestination.QUEUE_TYPE:
1051 rc.setType(DestinationType.QUEUE);
1052 return rc;
1053 case ActiveMQDestination.TOPIC_TYPE:
1054 rc.setType(DestinationType.TOPIC);
1055 return rc;
1056 case ActiveMQDestination.TEMP_QUEUE_TYPE:
1057 rc.setType(DestinationType.TEMP_QUEUE);
1058 return rc;
1059 case ActiveMQDestination.TEMP_TOPIC_TYPE:
1060 rc.setType(DestinationType.TEMP_TOPIC);
1061 return rc;
1062 default:
1063 return null;
1064 }
1065 }
1066
1067 ActiveMQDestination convert(String dest) {
1068 int p = dest.indexOf(":");
1069 if (p < 0) {
1070 throw new IllegalArgumentException("Not in the valid destination format");
1071 }
1072 int type = Integer.parseInt(dest.substring(0, p));
1073 String name = dest.substring(p + 1);
1074 return convert(type, name);
1075 }
1076
1077 private ActiveMQDestination convert(KahaDestination commandDestination) {
1078 return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1079 }
1080
1081 private ActiveMQDestination convert(int type, String name) {
1082 switch (KahaDestination.DestinationType.valueOf(type)) {
1083 case QUEUE:
1084 return new ActiveMQQueue(name);
1085 case TOPIC:
1086 return new ActiveMQTopic(name);
1087 case TEMP_QUEUE:
1088 return new ActiveMQTempQueue(name);
1089 case TEMP_TOPIC:
1090 return new ActiveMQTempTopic(name);
1091 default:
1092 throw new IllegalArgumentException("Not in the valid destination format");
1093 }
1094 }
1095
1096 public TransactionIdTransformer getTransactionIdTransformer() {
1097 return transactionIdTransformer;
1098 }
1099
1100 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1101 this.transactionIdTransformer = transactionIdTransformer;
1102 }
1103
1104 static class AsyncJobKey {
1105 MessageId id;
1106 ActiveMQDestination destination;
1107
1108 AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1109 this.id = id;
1110 this.destination = destination;
1111 }
1112
1113 @Override
1114 public boolean equals(Object obj) {
1115 if (obj == this) {
1116 return true;
1117 }
1118 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1119 && destination.equals(((AsyncJobKey) obj).destination);
1120 }
1121
1122 @Override
1123 public int hashCode() {
1124 return id.hashCode() + destination.hashCode();
1125 }
1126
1127 @Override
1128 public String toString() {
1129 return destination.getPhysicalName() + "-" + id;
1130 }
1131 }
1132
1133 public interface StoreTask {
1134 public boolean cancel();
1135
1136 public void aquireLocks();
1137
1138 public void releaseLocks();
1139 }
1140
1141 class StoreQueueTask implements Runnable, StoreTask {
1142 protected final Message message;
1143 protected final ConnectionContext context;
1144 protected final KahaDBMessageStore store;
1145 protected final InnerFutureTask future;
1146 protected final AtomicBoolean done = new AtomicBoolean();
1147 protected final AtomicBoolean locked = new AtomicBoolean();
1148
1149 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1150 this.store = store;
1151 this.context = context;
1152 this.message = message;
1153 this.future = new InnerFutureTask(this);
1154 }
1155
1156 public Future<Object> getFuture() {
1157 return this.future;
1158 }
1159
1160 public boolean cancel() {
1161 if (this.done.compareAndSet(false, true)) {
1162 return this.future.cancel(false);
1163 }
1164 return false;
1165 }
1166
1167 public void aquireLocks() {
1168 if (this.locked.compareAndSet(false, true)) {
1169 try {
1170 globalQueueSemaphore.acquire();
1171 store.acquireLocalAsyncLock();
1172 message.incrementReferenceCount();
1173 } catch (InterruptedException e) {
1174 LOG.warn("Failed to aquire lock", e);
1175 }
1176 }
1177
1178 }
1179
1180 public void releaseLocks() {
1181 if (this.locked.compareAndSet(true, false)) {
1182 store.releaseLocalAsyncLock();
1183 globalQueueSemaphore.release();
1184 message.decrementReferenceCount();
1185 }
1186 }
1187
1188 public void run() {
1189 this.store.doneTasks++;
1190 try {
1191 if (this.done.compareAndSet(false, true)) {
1192 this.store.addMessage(context, message);
1193 removeQueueTask(this.store, this.message.getMessageId());
1194 this.future.complete();
1195 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1196 System.err.println(this.store.dest.getName() + " cancelled: "
1197 + (this.store.canceledTasks / this.store.doneTasks) * 100);
1198 this.store.canceledTasks = this.store.doneTasks = 0;
1199 }
1200 } catch (Exception e) {
1201 this.future.setException(e);
1202 }
1203 }
1204
1205 protected Message getMessage() {
1206 return this.message;
1207 }
1208
1209 private class InnerFutureTask extends FutureTask<Object> {
1210
1211 public InnerFutureTask(Runnable runnable) {
1212 super(runnable, null);
1213
1214 }
1215
1216 public void setException(final Exception e) {
1217 super.setException(e);
1218 }
1219
1220 public void complete() {
1221 super.set(null);
1222 }
1223 }
1224 }
1225
1226 class StoreTopicTask extends StoreQueueTask {
1227 private final int subscriptionCount;
1228 private final List<String> subscriptionKeys = new ArrayList<String>(1);
1229 private final KahaDBTopicMessageStore topicStore;
1230 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1231 int subscriptionCount) {
1232 super(store, context, message);
1233 this.topicStore = store;
1234 this.subscriptionCount = subscriptionCount;
1235
1236 }
1237
1238 @Override
1239 public void aquireLocks() {
1240 if (this.locked.compareAndSet(false, true)) {
1241 try {
1242 globalTopicSemaphore.acquire();
1243 store.acquireLocalAsyncLock();
1244 message.incrementReferenceCount();
1245 } catch (InterruptedException e) {
1246 LOG.warn("Failed to aquire lock", e);
1247 }
1248 }
1249
1250 }
1251
1252 @Override
1253 public void releaseLocks() {
1254 if (this.locked.compareAndSet(true, false)) {
1255 message.decrementReferenceCount();
1256 store.releaseLocalAsyncLock();
1257 globalTopicSemaphore.release();
1258 }
1259 }
1260
1261 /**
1262 * add a key
1263 *
1264 * @param key
1265 * @return true if all acknowledgements received
1266 */
1267 public boolean addSubscriptionKey(String key) {
1268 synchronized (this.subscriptionKeys) {
1269 this.subscriptionKeys.add(key);
1270 }
1271 return this.subscriptionKeys.size() >= this.subscriptionCount;
1272 }
1273
1274 @Override
1275 public void run() {
1276 this.store.doneTasks++;
1277 try {
1278 if (this.done.compareAndSet(false, true)) {
1279 this.topicStore.addMessage(context, message);
1280 // apply any acks we have
1281 synchronized (this.subscriptionKeys) {
1282 for (String key : this.subscriptionKeys) {
1283 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1284
1285 }
1286 }
1287 removeTopicTask(this.topicStore, this.message.getMessageId());
1288 this.future.complete();
1289 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1290 System.err.println(this.store.dest.getName() + " cancelled: "
1291 + (this.store.canceledTasks / this.store.doneTasks) * 100);
1292 this.store.canceledTasks = this.store.doneTasks = 0;
1293 }
1294 } catch (Exception e) {
1295 this.future.setException(e);
1296 }
1297 }
1298 }
1299
1300 public class StoreTaskExecutor extends ThreadPoolExecutor {
1301
1302 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1303 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1304 }
1305
1306 protected void afterExecute(Runnable runnable, Throwable throwable) {
1307 super.afterExecute(runnable, throwable);
1308
1309 if (runnable instanceof StoreTask) {
1310 ((StoreTask)runnable).releaseLocks();
1311 }
1312
1313 }
1314 }
1315 }