001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.util.ArrayList;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.Iterator;
026    import java.util.LinkedList;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.Map.Entry;
031    import java.util.concurrent.*;
032    import java.util.concurrent.atomic.AtomicBoolean;
033    import java.util.concurrent.atomic.AtomicInteger;
034    import org.apache.activemq.broker.ConnectionContext;
035    import org.apache.activemq.broker.region.Destination;
036    import org.apache.activemq.broker.region.RegionBroker;
037    import org.apache.activemq.command.ActiveMQDestination;
038    import org.apache.activemq.command.ActiveMQQueue;
039    import org.apache.activemq.command.ActiveMQTempQueue;
040    import org.apache.activemq.command.ActiveMQTempTopic;
041    import org.apache.activemq.command.ActiveMQTopic;
042    import org.apache.activemq.command.Message;
043    import org.apache.activemq.command.MessageAck;
044    import org.apache.activemq.command.MessageId;
045    import org.apache.activemq.command.ProducerId;
046    import org.apache.activemq.command.SubscriptionInfo;
047    import org.apache.activemq.command.TransactionId;
048    import org.apache.activemq.openwire.OpenWireFormat;
049    import org.apache.activemq.protobuf.Buffer;
050    import org.apache.activemq.store.AbstractMessageStore;
051    import org.apache.activemq.store.MessageRecoveryListener;
052    import org.apache.activemq.store.MessageStore;
053    import org.apache.activemq.store.PersistenceAdapter;
054    import org.apache.activemq.store.TopicMessageStore;
055    import org.apache.activemq.store.TransactionStore;
056    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
057    import org.apache.activemq.store.kahadb.data.KahaDestination;
058    import org.apache.activemq.store.kahadb.data.KahaLocation;
059    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
060    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
061    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
062    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
063    import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
064    import org.apache.activemq.usage.MemoryUsage;
065    import org.apache.activemq.usage.SystemUsage;
066    import org.apache.activemq.util.ServiceStopper;
067    import org.apache.activemq.util.ThreadPoolUtils;
068    import org.apache.activemq.wireformat.WireFormat;
069    import org.slf4j.Logger;
070    import org.slf4j.LoggerFactory;
071    import org.apache.kahadb.journal.Location;
072    import org.apache.kahadb.page.Transaction;
073    
074    public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
075        static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
076        private static final int MAX_ASYNC_JOBS = 10000;
077    
078        public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
079        public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
080                PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
081        public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
082        private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
083                PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
084    
085        protected ExecutorService queueExecutor;
086        protected ExecutorService topicExecutor;
087        protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
088        protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
089        final WireFormat wireFormat = new OpenWireFormat();
090        private SystemUsage usageManager;
091        private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
092        private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
093        Semaphore globalQueueSemaphore;
094        Semaphore globalTopicSemaphore;
095        private boolean concurrentStoreAndDispatchQueues = true;
096        // when true, message order may be compromised when cache is exhausted if store is out
097        // or order w.r.t cache
098        private boolean concurrentStoreAndDispatchTopics = false;
099        private boolean concurrentStoreAndDispatchTransactions = false;
100        private int maxAsyncJobs = MAX_ASYNC_JOBS;
101        private final KahaDBTransactionStore transactionStore;
102        private TransactionIdTransformer transactionIdTransformer;
103    
104        public KahaDBStore() {
105            this.transactionStore = new KahaDBTransactionStore(this);
106            this.transactionIdTransformer = new TransactionIdTransformer() {
107                @Override
108                public KahaTransactionInfo transform(TransactionId txid) {
109                    return TransactionIdConversion.convert(txid);
110                }
111            };
112        }
113    
114        @Override
115        public String toString() {
116            return "KahaDB:[" + directory.getAbsolutePath() + "]";
117        }
118    
119        public void setBrokerName(String brokerName) {
120        }
121    
122        public void setUsageManager(SystemUsage usageManager) {
123            this.usageManager = usageManager;
124        }
125    
126        public SystemUsage getUsageManager() {
127            return this.usageManager;
128        }
129    
130        /**
131         * @return the concurrentStoreAndDispatch
132         */
133        public boolean isConcurrentStoreAndDispatchQueues() {
134            return this.concurrentStoreAndDispatchQueues;
135        }
136    
137        /**
138         * @param concurrentStoreAndDispatch
139         *            the concurrentStoreAndDispatch to set
140         */
141        public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
142            this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
143        }
144    
145        /**
146         * @return the concurrentStoreAndDispatch
147         */
148        public boolean isConcurrentStoreAndDispatchTopics() {
149            return this.concurrentStoreAndDispatchTopics;
150        }
151    
152        /**
153         * @param concurrentStoreAndDispatch
154         *            the concurrentStoreAndDispatch to set
155         */
156        public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
157            this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
158        }
159    
160        public boolean isConcurrentStoreAndDispatchTransactions() {
161            return this.concurrentStoreAndDispatchTransactions;
162        }
163    
164        /**
165         * @return the maxAsyncJobs
166         */
167        public int getMaxAsyncJobs() {
168            return this.maxAsyncJobs;
169        }
170        /**
171         * @param maxAsyncJobs
172         *            the maxAsyncJobs to set
173         */
174        public void setMaxAsyncJobs(int maxAsyncJobs) {
175            this.maxAsyncJobs = maxAsyncJobs;
176        }
177    
178        @Override
179        public void doStart() throws Exception {
180            super.doStart();
181            this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
182            this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
183            this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
184            this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
185            this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
186                    asyncQueueJobQueue, new ThreadFactory() {
187                        public Thread newThread(Runnable runnable) {
188                            Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
189                            thread.setDaemon(true);
190                            return thread;
191                        }
192                    });
193            this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
194                    asyncTopicJobQueue, new ThreadFactory() {
195                        public Thread newThread(Runnable runnable) {
196                            Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
197                            thread.setDaemon(true);
198                            return thread;
199                        }
200                    });
201        }
202    
203        @Override
204        public void doStop(ServiceStopper stopper) throws Exception {
205            // drain down async jobs
206            LOG.info("Stopping async queue tasks");
207            if (this.globalQueueSemaphore != null) {
208                this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
209            }
210            synchronized (this.asyncQueueMaps) {
211                for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
212                    synchronized (m) {
213                        for (StoreTask task : m.values()) {
214                            task.cancel();
215                        }
216                    }
217                }
218                this.asyncQueueMaps.clear();
219            }
220            LOG.info("Stopping async topic tasks");
221            if (this.globalTopicSemaphore != null) {
222                this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
223            }
224            synchronized (this.asyncTopicMaps) {
225                for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
226                    synchronized (m) {
227                        for (StoreTask task : m.values()) {
228                            task.cancel();
229                        }
230                    }
231                }
232                this.asyncTopicMaps.clear();
233            }
234            if (this.globalQueueSemaphore != null) {
235                this.globalQueueSemaphore.drainPermits();
236            }
237            if (this.globalTopicSemaphore != null) {
238                this.globalTopicSemaphore.drainPermits();
239            }
240            if (this.queueExecutor != null) {
241                ThreadPoolUtils.shutdownNow(queueExecutor);
242                queueExecutor = null;
243            }
244            if (this.topicExecutor != null) {
245                ThreadPoolUtils.shutdownNow(topicExecutor);
246                topicExecutor = null;
247            }
248            LOG.info("Stopped KahaDB");
249            super.doStop(stopper);
250        }
251    
252        void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException {
253            Location location;
254            this.indexLock.writeLock().lock();
255            try {
256                  location = findMessageLocation(key, destination);
257            } finally {
258                this.indexLock.writeLock().unlock();
259            }
260    
261            if (location != null) {
262                KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
263                Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
264    
265                message.incrementRedeliveryCounter();
266                if (LOG.isTraceEnabled()) {
267                    LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter());
268                }
269                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
270                addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
271    
272                final Location rewriteLocation = journal.write(toByteSequence(addMessage), true);
273    
274                this.indexLock.writeLock().lock();
275                try {
276                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
277                        public void execute(Transaction tx) throws IOException {
278                            StoredDestination sd = getStoredDestination(destination, tx);
279                            Long sequence = sd.messageIdIndex.get(tx, key);
280                            MessageKeys keys = sd.orderIndex.get(tx, sequence);
281                            sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation));
282                        }
283                    });
284                } finally {
285                    this.indexLock.writeLock().unlock();
286                }
287            }
288        }
289    
290        @Override
291        void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
292            if (brokerService != null) {
293                RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
294                if (regionBroker != null) {
295                    Set<Destination> destinationSet = regionBroker.getDestinations(convert(commandDestination));
296                    for (Destination destination : destinationSet) {
297                        destination.getDestinationStatistics().getMessages().decrement();
298                        destination.getDestinationStatistics().getEnqueues().decrement();
299                    }
300                }
301            }
302        }
303    
304        private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
305            return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
306                public Location execute(Transaction tx) throws IOException {
307                    StoredDestination sd = getStoredDestination(destination, tx);
308                    Long sequence = sd.messageIdIndex.get(tx, key);
309                    if (sequence == null) {
310                        return null;
311                    }
312                    return sd.orderIndex.get(tx, sequence).location;
313                }
314            });
315        }
316    
317        protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
318            StoreQueueTask task = null;
319            synchronized (store.asyncTaskMap) {
320                task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
321            }
322            return task;
323        }
324    
325        protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
326            synchronized (store.asyncTaskMap) {
327                store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
328            }
329            this.queueExecutor.execute(task);
330        }
331    
332        protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
333            StoreTopicTask task = null;
334            synchronized (store.asyncTaskMap) {
335                task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
336            }
337            return task;
338        }
339    
340        protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
341            synchronized (store.asyncTaskMap) {
342                store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
343            }
344            this.topicExecutor.execute(task);
345        }
346    
347        public TransactionStore createTransactionStore() throws IOException {
348            return this.transactionStore;
349        }
350    
351        public boolean getForceRecoverIndex() {
352            return this.forceRecoverIndex;
353        }
354    
355        public void setForceRecoverIndex(boolean forceRecoverIndex) {
356            this.forceRecoverIndex = forceRecoverIndex;
357        }
358    
359        public class KahaDBMessageStore extends AbstractMessageStore {
360            protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
361            protected KahaDestination dest;
362            private final int maxAsyncJobs;
363            private final Semaphore localDestinationSemaphore;
364    
365            double doneTasks, canceledTasks = 0;
366    
367            public KahaDBMessageStore(ActiveMQDestination destination) {
368                super(destination);
369                this.dest = convert(destination);
370                this.maxAsyncJobs = getMaxAsyncJobs();
371                this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
372            }
373    
374            @Override
375            public ActiveMQDestination getDestination() {
376                return destination;
377            }
378    
379            @Override
380            public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
381                    throws IOException {
382                if (isConcurrentStoreAndDispatchQueues()) {
383                    StoreQueueTask result = new StoreQueueTask(this, context, message);
384                    result.aquireLocks();
385                    addQueueTask(this, result);
386                    return result.getFuture();
387                } else {
388                    return super.asyncAddQueueMessage(context, message);
389                }
390            }
391    
392            @Override
393            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
394                if (isConcurrentStoreAndDispatchQueues()) {
395                    AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
396                    StoreQueueTask task = null;
397                    synchronized (asyncTaskMap) {
398                        task = (StoreQueueTask) asyncTaskMap.get(key);
399                    }
400                    if (task != null) {
401                        if (!task.cancel()) {
402                            try {
403    
404                                task.future.get();
405                            } catch (InterruptedException e) {
406                                throw new InterruptedIOException(e.toString());
407                            } catch (Exception ignored) {
408                                LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
409                            }
410                            removeMessage(context, ack);
411                        } else {
412                            synchronized (asyncTaskMap) {
413                                asyncTaskMap.remove(key);
414                            }
415                        }
416                    } else {
417                        removeMessage(context, ack);
418                    }
419                } else {
420                    removeMessage(context, ack);
421                }
422            }
423    
424            public void addMessage(ConnectionContext context, Message message) throws IOException {
425                KahaAddMessageCommand command = new KahaAddMessageCommand();
426                command.setDestination(dest);
427                command.setMessageId(message.getMessageId().toString());
428                command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
429                command.setPriority(message.getPriority());
430                command.setPrioritySupported(isPrioritizedMessages());
431                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
432                command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
433                store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
434    
435            }
436    
437            public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
438                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
439                command.setDestination(dest);
440                command.setMessageId(ack.getLastMessageId().toString());
441                command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
442    
443                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
444                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
445                store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
446            }
447    
448            public void removeAllMessages(ConnectionContext context) throws IOException {
449                KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
450                command.setDestination(dest);
451                store(command, true, null, null);
452            }
453    
454            public Message getMessage(MessageId identity) throws IOException {
455                final String key = identity.toString();
456    
457                // Hopefully one day the page file supports concurrent read
458                // operations... but for now we must
459                // externally synchronize...
460                Location location;
461                indexLock.writeLock().lock();
462                try {
463                    location = findMessageLocation(key, dest);
464                }finally {
465                    indexLock.writeLock().unlock();
466                }
467                if (location == null) {
468                    return null;
469                }
470    
471                return loadMessage(location);
472            }
473    
474            public int getMessageCount() throws IOException {
475                try {
476                    lockAsyncJobQueue();
477                    indexLock.writeLock().lock();
478                    try {
479                        return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
480                            public Integer execute(Transaction tx) throws IOException {
481                                // Iterate through all index entries to get a count
482                                // of
483                                // messages in the destination.
484                                StoredDestination sd = getStoredDestination(dest, tx);
485                                int rc = 0;
486                                for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
487                                        .hasNext();) {
488                                    iterator.next();
489                                    rc++;
490                                }
491                                return rc;
492                            }
493                        });
494                    }finally {
495                        indexLock.writeLock().unlock();
496                    }
497                } finally {
498                    unlockAsyncJobQueue();
499                }
500            }
501    
502            @Override
503            public boolean isEmpty() throws IOException {
504                indexLock.writeLock().lock();
505                try {
506                    return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
507                        public Boolean execute(Transaction tx) throws IOException {
508                            // Iterate through all index entries to get a count of
509                            // messages in the destination.
510                            StoredDestination sd = getStoredDestination(dest, tx);
511                            return sd.locationIndex.isEmpty(tx);
512                        }
513                    });
514                }finally {
515                    indexLock.writeLock().unlock();
516                }
517            }
518    
519            public void recover(final MessageRecoveryListener listener) throws Exception {
520                // recovery may involve expiry which will modify
521                indexLock.writeLock().lock();
522                try {
523                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
524                        public void execute(Transaction tx) throws Exception {
525                            StoredDestination sd = getStoredDestination(dest, tx);
526                            sd.orderIndex.resetCursorPosition();
527                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
528                                    .hasNext(); ) {
529                                Entry<Long, MessageKeys> entry = iterator.next();
530                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
531                                    continue;
532                                }
533                                Message msg = loadMessage(entry.getValue().location);
534                                listener.recoverMessage(msg);
535                            }
536                        }
537                    });
538                }finally {
539                    indexLock.writeLock().unlock();
540                }
541            }
542    
543    
544            public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
545                indexLock.writeLock().lock();
546                try {
547                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
548                        public void execute(Transaction tx) throws Exception {
549                            StoredDestination sd = getStoredDestination(dest, tx);
550                            Entry<Long, MessageKeys> entry = null;
551                            int counter = 0;
552                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
553                                 listener.hasSpace() && iterator.hasNext(); ) {
554                                entry = iterator.next();
555                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
556                                    continue;
557                                }
558                                Message msg = loadMessage(entry.getValue().location);
559                                listener.recoverMessage(msg);
560                                counter++;
561                                if (counter >= maxReturned) {
562                                    break;
563                                }
564                            }
565                            sd.orderIndex.stoppedIterating();
566                        }
567                    });
568                }finally {
569                    indexLock.writeLock().unlock();
570                }
571            }
572    
573            public void resetBatching() {
574                if (pageFile.isLoaded()) {
575                    indexLock.writeLock().lock();
576                    try {
577                        pageFile.tx().execute(new Transaction.Closure<Exception>() {
578                            public void execute(Transaction tx) throws Exception {
579                                StoredDestination sd = getExistingStoredDestination(dest, tx);
580                                if (sd != null) {
581                                    sd.orderIndex.resetCursorPosition();}
582                                }
583                            });
584                    } catch (Exception e) {
585                        LOG.error("Failed to reset batching",e);
586                    }finally {
587                        indexLock.writeLock().unlock();
588                    }
589                }
590            }
591    
592            @Override
593            public void setBatch(MessageId identity) throws IOException {
594                try {
595                    final String key = identity.toString();
596                    lockAsyncJobQueue();
597    
598                    // Hopefully one day the page file supports concurrent read
599                    // operations... but for now we must
600                    // externally synchronize...
601    
602                    indexLock.writeLock().lock();
603                    try {
604                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
605                            public void execute(Transaction tx) throws IOException {
606                                StoredDestination sd = getStoredDestination(dest, tx);
607                                Long location = sd.messageIdIndex.get(tx, key);
608                                if (location != null) {
609                                    sd.orderIndex.setBatch(tx, location);
610                                }
611                            }
612                        });
613                    } finally {
614                        indexLock.writeLock().unlock();
615                    }
616                } finally {
617                    unlockAsyncJobQueue();
618                }
619            }
620    
621            @Override
622            public void setMemoryUsage(MemoryUsage memoeyUSage) {
623            }
624            @Override
625            public void start() throws Exception {
626                super.start();
627            }
628            @Override
629            public void stop() throws Exception {
630                super.stop();
631            }
632    
633            protected void lockAsyncJobQueue() {
634                try {
635                    this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
636                } catch (Exception e) {
637                    LOG.error("Failed to lock async jobs for " + this.destination, e);
638                }
639            }
640    
641            protected void unlockAsyncJobQueue() {
642                this.localDestinationSemaphore.release(this.maxAsyncJobs);
643            }
644    
645            protected void acquireLocalAsyncLock() {
646                try {
647                    this.localDestinationSemaphore.acquire();
648                } catch (InterruptedException e) {
649                    LOG.error("Failed to aquire async lock for " + this.destination, e);
650                }
651            }
652    
653            protected void releaseLocalAsyncLock() {
654                this.localDestinationSemaphore.release();
655            }
656    
657        }
658    
659        class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
660            private final AtomicInteger subscriptionCount = new AtomicInteger();
661            public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
662                super(destination);
663                this.subscriptionCount.set(getAllSubscriptions().length);
664                asyncTopicMaps.add(asyncTaskMap);
665            }
666    
667            @Override
668            public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
669                    throws IOException {
670                if (isConcurrentStoreAndDispatchTopics()) {
671                    StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
672                    result.aquireLocks();
673                    addTopicTask(this, result);
674                    return result.getFuture();
675                } else {
676                    return super.asyncAddTopicMessage(context, message);
677                }
678            }
679    
680            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
681                                    MessageId messageId, MessageAck ack)
682                    throws IOException {
683                String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
684                if (isConcurrentStoreAndDispatchTopics()) {
685                    AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
686                    StoreTopicTask task = null;
687                    synchronized (asyncTaskMap) {
688                        task = (StoreTopicTask) asyncTaskMap.get(key);
689                    }
690                    if (task != null) {
691                        if (task.addSubscriptionKey(subscriptionKey)) {
692                            removeTopicTask(this, messageId);
693                            if (task.cancel()) {
694                                synchronized (asyncTaskMap) {
695                                    asyncTaskMap.remove(key);
696                                }
697                            }
698                        }
699                    } else {
700                        doAcknowledge(context, subscriptionKey, messageId, ack);
701                    }
702                } else {
703                    doAcknowledge(context, subscriptionKey, messageId, ack);
704                }
705            }
706    
707            protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
708                    throws IOException {
709                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
710                command.setDestination(dest);
711                command.setSubscriptionKey(subscriptionKey);
712                command.setMessageId(messageId.toString());
713                command.setTransactionInfo(ack != null ? transactionIdTransformer.transform(ack.getTransactionId()) : null);
714                if (ack != null && ack.isUnmatchedAck()) {
715                    command.setAck(UNMATCHED);
716                } else {
717                    org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
718                    command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
719                }
720                store(command, false, null, null);
721            }
722    
723            public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
724                String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
725                        .getSubscriptionName());
726                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
727                command.setDestination(dest);
728                command.setSubscriptionKey(subscriptionKey.toString());
729                command.setRetroactive(retroactive);
730                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
731                command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
732                store(command, isEnableJournalDiskSyncs() && true, null, null);
733                this.subscriptionCount.incrementAndGet();
734            }
735    
736            public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
737                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
738                command.setDestination(dest);
739                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
740                store(command, isEnableJournalDiskSyncs() && true, null, null);
741                this.subscriptionCount.decrementAndGet();
742            }
743    
744            public SubscriptionInfo[] getAllSubscriptions() throws IOException {
745    
746                final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
747                indexLock.writeLock().lock();
748                try {
749                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
750                        public void execute(Transaction tx) throws IOException {
751                            StoredDestination sd = getStoredDestination(dest, tx);
752                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
753                                    .hasNext();) {
754                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
755                                SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
756                                        .getValue().getSubscriptionInfo().newInput()));
757                                subscriptions.add(info);
758    
759                            }
760                        }
761                    });
762                }finally {
763                    indexLock.writeLock().unlock();
764                }
765    
766                SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
767                subscriptions.toArray(rc);
768                return rc;
769            }
770    
771            public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
772                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
773                indexLock.writeLock().lock();
774                try {
775                    return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
776                        public SubscriptionInfo execute(Transaction tx) throws IOException {
777                            StoredDestination sd = getStoredDestination(dest, tx);
778                            KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
779                            if (command == null) {
780                                return null;
781                            }
782                            return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
783                                    .getSubscriptionInfo().newInput()));
784                        }
785                    });
786                }finally {
787                    indexLock.writeLock().unlock();
788                }
789            }
790    
791            public int getMessageCount(String clientId, String subscriptionName) throws IOException {
792                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
793                indexLock.writeLock().lock();
794                try {
795                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
796                        public Integer execute(Transaction tx) throws IOException {
797                            StoredDestination sd = getStoredDestination(dest, tx);
798                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
799                            if (cursorPos == null) {
800                                // The subscription might not exist.
801                                return 0;
802                            }
803    
804                            return (int) getStoredMessageCount(tx, sd, subscriptionKey);
805                        }
806                    });
807                }finally {
808                    indexLock.writeLock().unlock();
809                }
810            }
811    
812            public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
813                    throws Exception {
814                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
815                @SuppressWarnings("unused")
816                final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
817                indexLock.writeLock().lock();
818                try {
819                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
820                        public void execute(Transaction tx) throws Exception {
821                            StoredDestination sd = getStoredDestination(dest, tx);
822                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
823                            sd.orderIndex.setBatch(tx, cursorPos);
824                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
825                                    .hasNext();) {
826                                Entry<Long, MessageKeys> entry = iterator.next();
827                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
828                                    continue;
829                                }
830                                listener.recoverMessage(loadMessage(entry.getValue().location));
831                            }
832                            sd.orderIndex.resetCursorPosition();
833                        }
834                    });
835                }finally {
836                    indexLock.writeLock().unlock();
837                }
838            }
839    
840            public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
841                    final MessageRecoveryListener listener) throws Exception {
842                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
843                @SuppressWarnings("unused")
844                final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
845                indexLock.writeLock().lock();
846                try {
847                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
848                        public void execute(Transaction tx) throws Exception {
849                            StoredDestination sd = getStoredDestination(dest, tx);
850                            sd.orderIndex.resetCursorPosition();
851                            MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
852                            if (moc == null) {
853                                LastAck pos = getLastAck(tx, sd, subscriptionKey);
854                                if (pos == null) {
855                                    // sub deleted
856                                    return;
857                                }
858                                sd.orderIndex.setBatch(tx, pos);
859                                moc = sd.orderIndex.cursor;
860                            } else {
861                                sd.orderIndex.cursor.sync(moc);
862                            }
863    
864                            Entry<Long, MessageKeys> entry = null;
865                            int counter = 0;
866                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
867                                    .hasNext();) {
868                                entry = iterator.next();
869                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
870                                    continue;
871                                }
872                                if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
873                                    counter++;
874                                }
875                                if (counter >= maxReturned || listener.hasSpace() == false) {
876                                    break;
877                                }
878                            }
879                            sd.orderIndex.stoppedIterating();
880                            if (entry != null) {
881                                MessageOrderCursor copy = sd.orderIndex.cursor.copy();
882                                sd.subscriptionCursors.put(subscriptionKey, copy);
883                            }
884                        }
885                    });
886                }finally {
887                    indexLock.writeLock().unlock();
888                }
889            }
890    
891            public void resetBatching(String clientId, String subscriptionName) {
892                try {
893                    final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
894                    indexLock.writeLock().lock();
895                    try {
896                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
897                            public void execute(Transaction tx) throws IOException {
898                                StoredDestination sd = getStoredDestination(dest, tx);
899                                sd.subscriptionCursors.remove(subscriptionKey);
900                            }
901                        });
902                    }finally {
903                        indexLock.writeLock().unlock();
904                    }
905                } catch (IOException e) {
906                    throw new RuntimeException(e);
907                }
908            }
909        }
910    
911        String subscriptionKey(String clientId, String subscriptionName) {
912            return clientId + ":" + subscriptionName;
913        }
914    
915        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
916            return this.transactionStore.proxy(new KahaDBMessageStore(destination));
917        }
918    
919        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
920            return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
921        }
922    
923        /**
924         * Cleanup method to remove any state associated with the given destination.
925         * This method does not stop the message store (it might not be cached).
926         *
927         * @param destination
928         *            Destination to forget
929         */
930        public void removeQueueMessageStore(ActiveMQQueue destination) {
931        }
932    
933        /**
934         * Cleanup method to remove any state associated with the given destination
935         * This method does not stop the message store (it might not be cached).
936         *
937         * @param destination
938         *            Destination to forget
939         */
940        public void removeTopicMessageStore(ActiveMQTopic destination) {
941        }
942    
943        public void deleteAllMessages() throws IOException {
944            deleteAllMessages = true;
945        }
946    
947        public Set<ActiveMQDestination> getDestinations() {
948            try {
949                final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
950                indexLock.writeLock().lock();
951                try {
952                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
953                        public void execute(Transaction tx) throws IOException {
954                            for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
955                                    .hasNext();) {
956                                Entry<String, StoredDestination> entry = iterator.next();
957                                if (!isEmptyTopic(entry, tx)) {
958                                    rc.add(convert(entry.getKey()));
959                                }
960                            }
961                        }
962    
963                        private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
964                                throws IOException {
965                            boolean isEmptyTopic = false;
966                            ActiveMQDestination dest = convert(entry.getKey());
967                            if (dest.isTopic()) {
968                                StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
969                                if (loadedStore.subscriptionAcks.isEmpty(tx)) {
970                                    isEmptyTopic = true;
971                                }
972                            }
973                            return isEmptyTopic;
974                        }
975                    });
976                }finally {
977                    indexLock.writeLock().unlock();
978                }
979                return rc;
980            } catch (IOException e) {
981                throw new RuntimeException(e);
982            }
983        }
984    
985        public long getLastMessageBrokerSequenceId() throws IOException {
986            return 0;
987        }
988    
989        public long getLastProducerSequenceId(ProducerId id) {
990            indexLock.readLock().lock();
991            try {
992                return metadata.producerSequenceIdTracker.getLastSeqId(id);
993            } finally {
994                indexLock.readLock().unlock();
995            }
996        }
997    
998        public long size() {
999            try {
1000                return journalSize.get() + getPageFile().getDiskSize();
1001            } catch (IOException e) {
1002                throw new RuntimeException(e);
1003            }
1004        }
1005    
1006        public void beginTransaction(ConnectionContext context) throws IOException {
1007            throw new IOException("Not yet implemented.");
1008        }
1009        public void commitTransaction(ConnectionContext context) throws IOException {
1010            throw new IOException("Not yet implemented.");
1011        }
1012        public void rollbackTransaction(ConnectionContext context) throws IOException {
1013            throw new IOException("Not yet implemented.");
1014        }
1015    
1016        public void checkpoint(boolean sync) throws IOException {
1017            super.checkpointCleanup(sync);
1018        }
1019    
1020        // /////////////////////////////////////////////////////////////////
1021        // Internal helper methods.
1022        // /////////////////////////////////////////////////////////////////
1023    
1024        /**
1025         * @param location
1026         * @return
1027         * @throws IOException
1028         */
1029        Message loadMessage(Location location) throws IOException {
1030            KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
1031            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1032            return msg;
1033        }
1034    
1035        // /////////////////////////////////////////////////////////////////
1036        // Internal conversion methods.
1037        // /////////////////////////////////////////////////////////////////
1038    
1039        KahaLocation convert(Location location) {
1040            KahaLocation rc = new KahaLocation();
1041            rc.setLogId(location.getDataFileId());
1042            rc.setOffset(location.getOffset());
1043            return rc;
1044        }
1045    
1046        KahaDestination convert(ActiveMQDestination dest) {
1047            KahaDestination rc = new KahaDestination();
1048            rc.setName(dest.getPhysicalName());
1049            switch (dest.getDestinationType()) {
1050            case ActiveMQDestination.QUEUE_TYPE:
1051                rc.setType(DestinationType.QUEUE);
1052                return rc;
1053            case ActiveMQDestination.TOPIC_TYPE:
1054                rc.setType(DestinationType.TOPIC);
1055                return rc;
1056            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1057                rc.setType(DestinationType.TEMP_QUEUE);
1058                return rc;
1059            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1060                rc.setType(DestinationType.TEMP_TOPIC);
1061                return rc;
1062            default:
1063                return null;
1064            }
1065        }
1066    
1067        ActiveMQDestination convert(String dest) {
1068            int p = dest.indexOf(":");
1069            if (p < 0) {
1070                throw new IllegalArgumentException("Not in the valid destination format");
1071            }
1072            int type = Integer.parseInt(dest.substring(0, p));
1073            String name = dest.substring(p + 1);
1074            return convert(type, name);
1075        }
1076    
1077        private ActiveMQDestination convert(KahaDestination commandDestination) {
1078            return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1079        }
1080    
1081        private ActiveMQDestination convert(int type, String name) {
1082            switch (KahaDestination.DestinationType.valueOf(type)) {
1083            case QUEUE:
1084                return new ActiveMQQueue(name);
1085            case TOPIC:
1086                return new ActiveMQTopic(name);
1087            case TEMP_QUEUE:
1088                return new ActiveMQTempQueue(name);
1089            case TEMP_TOPIC:
1090                return new ActiveMQTempTopic(name);
1091            default:
1092                throw new IllegalArgumentException("Not in the valid destination format");
1093            }
1094        }
1095    
1096        public TransactionIdTransformer getTransactionIdTransformer() {
1097            return transactionIdTransformer;
1098        }
1099    
1100        public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1101            this.transactionIdTransformer = transactionIdTransformer;
1102        }
1103    
1104        static class AsyncJobKey {
1105            MessageId id;
1106            ActiveMQDestination destination;
1107    
1108            AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1109                this.id = id;
1110                this.destination = destination;
1111            }
1112    
1113            @Override
1114            public boolean equals(Object obj) {
1115                if (obj == this) {
1116                    return true;
1117                }
1118                return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1119                        && destination.equals(((AsyncJobKey) obj).destination);
1120            }
1121    
1122            @Override
1123            public int hashCode() {
1124                return id.hashCode() + destination.hashCode();
1125            }
1126    
1127            @Override
1128            public String toString() {
1129                return destination.getPhysicalName() + "-" + id;
1130            }
1131        }
1132    
1133        public interface StoreTask {
1134            public boolean cancel();
1135    
1136            public void aquireLocks();
1137    
1138            public void releaseLocks();
1139        }
1140    
1141        class StoreQueueTask implements Runnable, StoreTask {
1142            protected final Message message;
1143            protected final ConnectionContext context;
1144            protected final KahaDBMessageStore store;
1145            protected final InnerFutureTask future;
1146            protected final AtomicBoolean done = new AtomicBoolean();
1147            protected final AtomicBoolean locked = new AtomicBoolean();
1148    
1149            public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1150                this.store = store;
1151                this.context = context;
1152                this.message = message;
1153                this.future = new InnerFutureTask(this);
1154            }
1155    
1156            public Future<Object> getFuture() {
1157                return this.future;
1158            }
1159    
1160            public boolean cancel() {
1161                if (this.done.compareAndSet(false, true)) {
1162                    return this.future.cancel(false);
1163                }
1164                return false;
1165            }
1166    
1167            public void aquireLocks() {
1168                if (this.locked.compareAndSet(false, true)) {
1169                    try {
1170                        globalQueueSemaphore.acquire();
1171                        store.acquireLocalAsyncLock();
1172                        message.incrementReferenceCount();
1173                    } catch (InterruptedException e) {
1174                        LOG.warn("Failed to aquire lock", e);
1175                    }
1176                }
1177    
1178            }
1179    
1180            public void releaseLocks() {
1181                if (this.locked.compareAndSet(true, false)) {
1182                    store.releaseLocalAsyncLock();
1183                    globalQueueSemaphore.release();
1184                    message.decrementReferenceCount();
1185                }
1186            }
1187    
1188            public void run() {
1189                this.store.doneTasks++;
1190                try {
1191                    if (this.done.compareAndSet(false, true)) {
1192                        this.store.addMessage(context, message);
1193                        removeQueueTask(this.store, this.message.getMessageId());
1194                        this.future.complete();
1195                    } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1196                        System.err.println(this.store.dest.getName() + " cancelled: "
1197                                + (this.store.canceledTasks / this.store.doneTasks) * 100);
1198                        this.store.canceledTasks = this.store.doneTasks = 0;
1199                    }
1200                } catch (Exception e) {
1201                    this.future.setException(e);
1202                }
1203            }
1204    
1205            protected Message getMessage() {
1206                return this.message;
1207            }
1208    
1209            private class InnerFutureTask extends FutureTask<Object> {
1210    
1211                public InnerFutureTask(Runnable runnable) {
1212                    super(runnable, null);
1213    
1214                }
1215    
1216                public void setException(final Exception e) {
1217                    super.setException(e);
1218                }
1219    
1220                public void complete() {
1221                    super.set(null);
1222                }
1223            }
1224        }
1225    
1226        class StoreTopicTask extends StoreQueueTask {
1227            private final int subscriptionCount;
1228            private final List<String> subscriptionKeys = new ArrayList<String>(1);
1229            private final KahaDBTopicMessageStore topicStore;
1230            public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1231                    int subscriptionCount) {
1232                super(store, context, message);
1233                this.topicStore = store;
1234                this.subscriptionCount = subscriptionCount;
1235    
1236            }
1237    
1238            @Override
1239            public void aquireLocks() {
1240                if (this.locked.compareAndSet(false, true)) {
1241                    try {
1242                        globalTopicSemaphore.acquire();
1243                        store.acquireLocalAsyncLock();
1244                        message.incrementReferenceCount();
1245                    } catch (InterruptedException e) {
1246                        LOG.warn("Failed to aquire lock", e);
1247                    }
1248                }
1249    
1250            }
1251    
1252            @Override
1253            public void releaseLocks() {
1254                if (this.locked.compareAndSet(true, false)) {
1255                    message.decrementReferenceCount();
1256                    store.releaseLocalAsyncLock();
1257                    globalTopicSemaphore.release();
1258                }
1259            }
1260    
1261            /**
1262             * add a key
1263             *
1264             * @param key
1265             * @return true if all acknowledgements received
1266             */
1267            public boolean addSubscriptionKey(String key) {
1268                synchronized (this.subscriptionKeys) {
1269                    this.subscriptionKeys.add(key);
1270                }
1271                return this.subscriptionKeys.size() >= this.subscriptionCount;
1272            }
1273    
1274            @Override
1275            public void run() {
1276                this.store.doneTasks++;
1277                try {
1278                    if (this.done.compareAndSet(false, true)) {
1279                        this.topicStore.addMessage(context, message);
1280                        // apply any acks we have
1281                        synchronized (this.subscriptionKeys) {
1282                            for (String key : this.subscriptionKeys) {
1283                                this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1284    
1285                            }
1286                        }
1287                        removeTopicTask(this.topicStore, this.message.getMessageId());
1288                        this.future.complete();
1289                    } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1290                        System.err.println(this.store.dest.getName() + " cancelled: "
1291                                + (this.store.canceledTasks / this.store.doneTasks) * 100);
1292                        this.store.canceledTasks = this.store.doneTasks = 0;
1293                    }
1294                } catch (Exception e) {
1295                    this.future.setException(e);
1296                }
1297            }
1298        }
1299    
1300        public class StoreTaskExecutor extends ThreadPoolExecutor {
1301    
1302            public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1303                super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1304            }
1305    
1306            protected void afterExecute(Runnable runnable, Throwable throwable) {
1307                super.afterExecute(runnable, throwable);
1308    
1309                if (runnable instanceof StoreTask) {
1310                   ((StoreTask)runnable).releaseLocks();
1311                }
1312    
1313            }
1314        }
1315    }