001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.util.ArrayList;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.Iterator;
026    import java.util.LinkedList;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.Map.Entry;
031    import java.util.concurrent.*;
032    import java.util.concurrent.atomic.AtomicBoolean;
033    import java.util.concurrent.atomic.AtomicInteger;
034    import org.apache.activemq.broker.ConnectionContext;
035    import org.apache.activemq.broker.region.Destination;
036    import org.apache.activemq.broker.region.RegionBroker;
037    import org.apache.activemq.command.ActiveMQDestination;
038    import org.apache.activemq.command.ActiveMQQueue;
039    import org.apache.activemq.command.ActiveMQTempQueue;
040    import org.apache.activemq.command.ActiveMQTempTopic;
041    import org.apache.activemq.command.ActiveMQTopic;
042    import org.apache.activemq.command.Message;
043    import org.apache.activemq.command.MessageAck;
044    import org.apache.activemq.command.MessageId;
045    import org.apache.activemq.command.ProducerId;
046    import org.apache.activemq.command.SubscriptionInfo;
047    import org.apache.activemq.command.TransactionId;
048    import org.apache.activemq.openwire.OpenWireFormat;
049    import org.apache.activemq.protobuf.Buffer;
050    import org.apache.activemq.store.AbstractMessageStore;
051    import org.apache.activemq.store.MessageRecoveryListener;
052    import org.apache.activemq.store.MessageStore;
053    import org.apache.activemq.store.PersistenceAdapter;
054    import org.apache.activemq.store.TopicMessageStore;
055    import org.apache.activemq.store.TransactionStore;
056    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
057    import org.apache.activemq.store.kahadb.data.KahaDestination;
058    import org.apache.activemq.store.kahadb.data.KahaLocation;
059    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
060    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
061    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
062    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
063    import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
064    import org.apache.activemq.usage.MemoryUsage;
065    import org.apache.activemq.usage.SystemUsage;
066    import org.apache.activemq.util.ServiceStopper;
067    import org.apache.activemq.util.ThreadPoolUtils;
068    import org.apache.activemq.wireformat.WireFormat;
069    import org.slf4j.Logger;
070    import org.slf4j.LoggerFactory;
071    import org.apache.activemq.store.kahadb.disk.journal.Location;
072    import org.apache.activemq.store.kahadb.disk.page.Transaction;
073    
074    public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
075        static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
076        private static final int MAX_ASYNC_JOBS = 10000;
077    
078        public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
079        public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
080                PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
081        public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
082        private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
083                PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
084    
085        protected ExecutorService queueExecutor;
086        protected ExecutorService topicExecutor;
087        protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
088        protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
089        final WireFormat wireFormat = new OpenWireFormat();
090        private SystemUsage usageManager;
091        private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
092        private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
093        Semaphore globalQueueSemaphore;
094        Semaphore globalTopicSemaphore;
095        private boolean concurrentStoreAndDispatchQueues = true;
096        // when true, message order may be compromised when cache is exhausted if store is out
097        // or order w.r.t cache
098        private boolean concurrentStoreAndDispatchTopics = false;
099        private boolean concurrentStoreAndDispatchTransactions = false;
100        private int maxAsyncJobs = MAX_ASYNC_JOBS;
101        private final KahaDBTransactionStore transactionStore;
102        private TransactionIdTransformer transactionIdTransformer;
103    
104        public KahaDBStore() {
105            this.transactionStore = new KahaDBTransactionStore(this);
106            this.transactionIdTransformer = new TransactionIdTransformer() {
107                @Override
108                public KahaTransactionInfo transform(TransactionId txid) {
109                    return TransactionIdConversion.convert(txid);
110                }
111            };
112        }
113    
114        @Override
115        public String toString() {
116            return "KahaDB:[" + directory.getAbsolutePath() + "]";
117        }
118    
119        public void setBrokerName(String brokerName) {
120        }
121    
122        public void setUsageManager(SystemUsage usageManager) {
123            this.usageManager = usageManager;
124        }
125    
126        public SystemUsage getUsageManager() {
127            return this.usageManager;
128        }
129    
130        /**
131         * @return the concurrentStoreAndDispatch
132         */
133        public boolean isConcurrentStoreAndDispatchQueues() {
134            return this.concurrentStoreAndDispatchQueues;
135        }
136    
137        /**
138         * @param concurrentStoreAndDispatch
139         *            the concurrentStoreAndDispatch to set
140         */
141        public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
142            this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
143        }
144    
145        /**
146         * @return the concurrentStoreAndDispatch
147         */
148        public boolean isConcurrentStoreAndDispatchTopics() {
149            return this.concurrentStoreAndDispatchTopics;
150        }
151    
152        /**
153         * @param concurrentStoreAndDispatch
154         *            the concurrentStoreAndDispatch to set
155         */
156        public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
157            this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
158        }
159    
160        public boolean isConcurrentStoreAndDispatchTransactions() {
161            return this.concurrentStoreAndDispatchTransactions;
162        }
163    
164        /**
165         * @return the maxAsyncJobs
166         */
167        public int getMaxAsyncJobs() {
168            return this.maxAsyncJobs;
169        }
170        /**
171         * @param maxAsyncJobs
172         *            the maxAsyncJobs to set
173         */
174        public void setMaxAsyncJobs(int maxAsyncJobs) {
175            this.maxAsyncJobs = maxAsyncJobs;
176        }
177    
178        @Override
179        public void doStart() throws Exception {
180            super.doStart();
181            this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
182            this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
183            this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
184            this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
185            this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
186                    asyncQueueJobQueue, new ThreadFactory() {
187                        public Thread newThread(Runnable runnable) {
188                            Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
189                            thread.setDaemon(true);
190                            return thread;
191                        }
192                    });
193            this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
194                    asyncTopicJobQueue, new ThreadFactory() {
195                        public Thread newThread(Runnable runnable) {
196                            Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
197                            thread.setDaemon(true);
198                            return thread;
199                        }
200                    });
201        }
202    
203        @Override
204        public void doStop(ServiceStopper stopper) throws Exception {
205            // drain down async jobs
206            LOG.info("Stopping async queue tasks");
207            if (this.globalQueueSemaphore != null) {
208                this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
209            }
210            synchronized (this.asyncQueueMaps) {
211                for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
212                    synchronized (m) {
213                        for (StoreTask task : m.values()) {
214                            task.cancel();
215                        }
216                    }
217                }
218                this.asyncQueueMaps.clear();
219            }
220            LOG.info("Stopping async topic tasks");
221            if (this.globalTopicSemaphore != null) {
222                this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
223            }
224            synchronized (this.asyncTopicMaps) {
225                for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
226                    synchronized (m) {
227                        for (StoreTask task : m.values()) {
228                            task.cancel();
229                        }
230                    }
231                }
232                this.asyncTopicMaps.clear();
233            }
234            if (this.globalQueueSemaphore != null) {
235                this.globalQueueSemaphore.drainPermits();
236            }
237            if (this.globalTopicSemaphore != null) {
238                this.globalTopicSemaphore.drainPermits();
239            }
240            if (this.queueExecutor != null) {
241                ThreadPoolUtils.shutdownNow(queueExecutor);
242                queueExecutor = null;
243            }
244            if (this.topicExecutor != null) {
245                ThreadPoolUtils.shutdownNow(topicExecutor);
246                topicExecutor = null;
247            }
248            LOG.info("Stopped KahaDB");
249            super.doStop(stopper);
250        }
251    
252        void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException {
253            Location location;
254            this.indexLock.writeLock().lock();
255            try {
256                  location = findMessageLocation(key, destination);
257            } finally {
258                this.indexLock.writeLock().unlock();
259            }
260    
261            if (location != null) {
262                KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
263                Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
264    
265                message.incrementRedeliveryCounter();
266                if (LOG.isTraceEnabled()) {
267                    LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter());
268                }
269                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
270                addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
271    
272                final Location rewriteLocation = journal.write(toByteSequence(addMessage), true);
273    
274                this.indexLock.writeLock().lock();
275                try {
276                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
277                        public void execute(Transaction tx) throws IOException {
278                            StoredDestination sd = getStoredDestination(destination, tx);
279                            Long sequence = sd.messageIdIndex.get(tx, key);
280                            MessageKeys keys = sd.orderIndex.get(tx, sequence);
281                            sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation));
282                        }
283                    });
284                } finally {
285                    this.indexLock.writeLock().unlock();
286                }
287            }
288        }
289    
290        @Override
291        void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
292            if (brokerService != null) {
293                RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
294                if (regionBroker != null) {
295                    Set<Destination> destinationSet = regionBroker.getDestinations(convert(commandDestination));
296                    for (Destination destination : destinationSet) {
297                        destination.getDestinationStatistics().getMessages().decrement();
298                        destination.getDestinationStatistics().getEnqueues().decrement();
299                    }
300                }
301            }
302        }
303    
304        private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
305            return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
306                public Location execute(Transaction tx) throws IOException {
307                    StoredDestination sd = getStoredDestination(destination, tx);
308                    Long sequence = sd.messageIdIndex.get(tx, key);
309                    if (sequence == null) {
310                        return null;
311                    }
312                    return sd.orderIndex.get(tx, sequence).location;
313                }
314            });
315        }
316    
317        protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
318            StoreQueueTask task = null;
319            synchronized (store.asyncTaskMap) {
320                task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
321            }
322            return task;
323        }
324    
325        protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
326            synchronized (store.asyncTaskMap) {
327                store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
328            }
329            this.queueExecutor.execute(task);
330        }
331    
332        protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
333            StoreTopicTask task = null;
334            synchronized (store.asyncTaskMap) {
335                task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
336            }
337            return task;
338        }
339    
340        protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
341            synchronized (store.asyncTaskMap) {
342                store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
343            }
344            this.topicExecutor.execute(task);
345        }
346    
347        public TransactionStore createTransactionStore() throws IOException {
348            return this.transactionStore;
349        }
350    
351        public boolean getForceRecoverIndex() {
352            return this.forceRecoverIndex;
353        }
354    
355        public void setForceRecoverIndex(boolean forceRecoverIndex) {
356            this.forceRecoverIndex = forceRecoverIndex;
357        }
358    
359        public class KahaDBMessageStore extends AbstractMessageStore {
360            protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
361            protected KahaDestination dest;
362            private final int maxAsyncJobs;
363            private final Semaphore localDestinationSemaphore;
364    
365            double doneTasks, canceledTasks = 0;
366    
367            public KahaDBMessageStore(ActiveMQDestination destination) {
368                super(destination);
369                this.dest = convert(destination);
370                this.maxAsyncJobs = getMaxAsyncJobs();
371                this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
372            }
373    
374            @Override
375            public ActiveMQDestination getDestination() {
376                return destination;
377            }
378    
379            @Override
380            public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
381                    throws IOException {
382                if (isConcurrentStoreAndDispatchQueues()) {
383                    StoreQueueTask result = new StoreQueueTask(this, context, message);
384                    result.aquireLocks();
385                    addQueueTask(this, result);
386                    return result.getFuture();
387                } else {
388                    return super.asyncAddQueueMessage(context, message);
389                }
390            }
391    
392            @Override
393            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
394                if (isConcurrentStoreAndDispatchQueues()) {
395                    AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
396                    StoreQueueTask task = null;
397                    synchronized (asyncTaskMap) {
398                        task = (StoreQueueTask) asyncTaskMap.get(key);
399                    }
400                    if (task != null) {
401                        if (ack.isInTransaction() || !task.cancel()) {
402                            try {
403                                task.future.get();
404                            } catch (InterruptedException e) {
405                                throw new InterruptedIOException(e.toString());
406                            } catch (Exception ignored) {
407                                LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
408                            }
409                            removeMessage(context, ack);
410                        } else {
411                            synchronized (asyncTaskMap) {
412                                asyncTaskMap.remove(key);
413                            }
414                        }
415                    } else {
416                        removeMessage(context, ack);
417                    }
418                } else {
419                    removeMessage(context, ack);
420                }
421            }
422    
423            public void addMessage(ConnectionContext context, Message message) throws IOException {
424                KahaAddMessageCommand command = new KahaAddMessageCommand();
425                command.setDestination(dest);
426                command.setMessageId(message.getMessageId().toString());
427                command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
428                command.setPriority(message.getPriority());
429                command.setPrioritySupported(isPrioritizedMessages());
430                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
431                command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
432                store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
433    
434            }
435    
436            public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
437                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
438                command.setDestination(dest);
439                command.setMessageId(ack.getLastMessageId().toString());
440                command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
441    
442                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
443                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
444                store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
445            }
446    
447            public void removeAllMessages(ConnectionContext context) throws IOException {
448                KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
449                command.setDestination(dest);
450                store(command, true, null, null);
451            }
452    
453            public Message getMessage(MessageId identity) throws IOException {
454                final String key = identity.toString();
455    
456                // Hopefully one day the page file supports concurrent read
457                // operations... but for now we must
458                // externally synchronize...
459                Location location;
460                indexLock.writeLock().lock();
461                try {
462                    location = findMessageLocation(key, dest);
463                }finally {
464                    indexLock.writeLock().unlock();
465                }
466                if (location == null) {
467                    return null;
468                }
469    
470                return loadMessage(location);
471            }
472    
473            public int getMessageCount() throws IOException {
474                try {
475                    lockAsyncJobQueue();
476                    indexLock.writeLock().lock();
477                    try {
478                        return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
479                            public Integer execute(Transaction tx) throws IOException {
480                                // Iterate through all index entries to get a count
481                                // of
482                                // messages in the destination.
483                                StoredDestination sd = getStoredDestination(dest, tx);
484                                int rc = 0;
485                                for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
486                                        .hasNext();) {
487                                    iterator.next();
488                                    rc++;
489                                }
490                                return rc;
491                            }
492                        });
493                    }finally {
494                        indexLock.writeLock().unlock();
495                    }
496                } finally {
497                    unlockAsyncJobQueue();
498                }
499            }
500    
501            @Override
502            public boolean isEmpty() throws IOException {
503                indexLock.writeLock().lock();
504                try {
505                    return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
506                        public Boolean execute(Transaction tx) throws IOException {
507                            // Iterate through all index entries to get a count of
508                            // messages in the destination.
509                            StoredDestination sd = getStoredDestination(dest, tx);
510                            return sd.locationIndex.isEmpty(tx);
511                        }
512                    });
513                }finally {
514                    indexLock.writeLock().unlock();
515                }
516            }
517    
518            public void recover(final MessageRecoveryListener listener) throws Exception {
519                // recovery may involve expiry which will modify
520                indexLock.writeLock().lock();
521                try {
522                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
523                        public void execute(Transaction tx) throws Exception {
524                            StoredDestination sd = getStoredDestination(dest, tx);
525                            sd.orderIndex.resetCursorPosition();
526                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
527                                    .hasNext(); ) {
528                                Entry<Long, MessageKeys> entry = iterator.next();
529                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
530                                    continue;
531                                }
532                                Message msg = loadMessage(entry.getValue().location);
533                                listener.recoverMessage(msg);
534                            }
535                        }
536                    });
537                }finally {
538                    indexLock.writeLock().unlock();
539                }
540            }
541    
542    
543            public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
544                indexLock.writeLock().lock();
545                try {
546                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
547                        public void execute(Transaction tx) throws Exception {
548                            StoredDestination sd = getStoredDestination(dest, tx);
549                            Entry<Long, MessageKeys> entry = null;
550                            int counter = 0;
551                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
552                                 listener.hasSpace() && iterator.hasNext(); ) {
553                                entry = iterator.next();
554                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
555                                    continue;
556                                }
557                                Message msg = loadMessage(entry.getValue().location);
558                                listener.recoverMessage(msg);
559                                counter++;
560                                if (counter >= maxReturned) {
561                                    break;
562                                }
563                            }
564                            sd.orderIndex.stoppedIterating();
565                        }
566                    });
567                }finally {
568                    indexLock.writeLock().unlock();
569                }
570            }
571    
572            public void resetBatching() {
573                if (pageFile.isLoaded()) {
574                    indexLock.writeLock().lock();
575                    try {
576                        pageFile.tx().execute(new Transaction.Closure<Exception>() {
577                            public void execute(Transaction tx) throws Exception {
578                                StoredDestination sd = getExistingStoredDestination(dest, tx);
579                                if (sd != null) {
580                                    sd.orderIndex.resetCursorPosition();}
581                                }
582                            });
583                    } catch (Exception e) {
584                        LOG.error("Failed to reset batching",e);
585                    }finally {
586                        indexLock.writeLock().unlock();
587                    }
588                }
589            }
590    
591            @Override
592            public void setBatch(MessageId identity) throws IOException {
593                try {
594                    final String key = identity.toString();
595                    lockAsyncJobQueue();
596    
597                    // Hopefully one day the page file supports concurrent read
598                    // operations... but for now we must
599                    // externally synchronize...
600    
601                    indexLock.writeLock().lock();
602                    try {
603                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
604                            public void execute(Transaction tx) throws IOException {
605                                StoredDestination sd = getStoredDestination(dest, tx);
606                                Long location = sd.messageIdIndex.get(tx, key);
607                                if (location != null) {
608                                    sd.orderIndex.setBatch(tx, location);
609                                }
610                            }
611                        });
612                    } finally {
613                        indexLock.writeLock().unlock();
614                    }
615                } finally {
616                    unlockAsyncJobQueue();
617                }
618            }
619    
620            @Override
621            public void setMemoryUsage(MemoryUsage memoeyUSage) {
622            }
623            @Override
624            public void start() throws Exception {
625                super.start();
626            }
627            @Override
628            public void stop() throws Exception {
629                super.stop();
630            }
631    
632            protected void lockAsyncJobQueue() {
633                try {
634                    this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
635                } catch (Exception e) {
636                    LOG.error("Failed to lock async jobs for " + this.destination, e);
637                }
638            }
639    
640            protected void unlockAsyncJobQueue() {
641                this.localDestinationSemaphore.release(this.maxAsyncJobs);
642            }
643    
644            protected void acquireLocalAsyncLock() {
645                try {
646                    this.localDestinationSemaphore.acquire();
647                } catch (InterruptedException e) {
648                    LOG.error("Failed to aquire async lock for " + this.destination, e);
649                }
650            }
651    
652            protected void releaseLocalAsyncLock() {
653                this.localDestinationSemaphore.release();
654            }
655    
656        }
657    
658        class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
659            private final AtomicInteger subscriptionCount = new AtomicInteger();
660            public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
661                super(destination);
662                this.subscriptionCount.set(getAllSubscriptions().length);
663                asyncTopicMaps.add(asyncTaskMap);
664            }
665    
666            @Override
667            public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
668                    throws IOException {
669                if (isConcurrentStoreAndDispatchTopics()) {
670                    StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
671                    result.aquireLocks();
672                    addTopicTask(this, result);
673                    return result.getFuture();
674                } else {
675                    return super.asyncAddTopicMessage(context, message);
676                }
677            }
678    
679            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
680                                    MessageId messageId, MessageAck ack)
681                    throws IOException {
682                String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
683                if (isConcurrentStoreAndDispatchTopics()) {
684                    AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
685                    StoreTopicTask task = null;
686                    synchronized (asyncTaskMap) {
687                        task = (StoreTopicTask) asyncTaskMap.get(key);
688                    }
689                    if (task != null) {
690                        if (task.addSubscriptionKey(subscriptionKey)) {
691                            removeTopicTask(this, messageId);
692                            if (task.cancel()) {
693                                synchronized (asyncTaskMap) {
694                                    asyncTaskMap.remove(key);
695                                }
696                            }
697                        }
698                    } else {
699                        doAcknowledge(context, subscriptionKey, messageId, ack);
700                    }
701                } else {
702                    doAcknowledge(context, subscriptionKey, messageId, ack);
703                }
704            }
705    
706            protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
707                    throws IOException {
708                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
709                command.setDestination(dest);
710                command.setSubscriptionKey(subscriptionKey);
711                command.setMessageId(messageId.toString());
712                command.setTransactionInfo(ack != null ? transactionIdTransformer.transform(ack.getTransactionId()) : null);
713                if (ack != null && ack.isUnmatchedAck()) {
714                    command.setAck(UNMATCHED);
715                } else {
716                    org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
717                    command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
718                }
719                store(command, false, null, null);
720            }
721    
722            public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
723                String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
724                        .getSubscriptionName());
725                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
726                command.setDestination(dest);
727                command.setSubscriptionKey(subscriptionKey.toString());
728                command.setRetroactive(retroactive);
729                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
730                command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
731                store(command, isEnableJournalDiskSyncs() && true, null, null);
732                this.subscriptionCount.incrementAndGet();
733            }
734    
735            public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
736                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
737                command.setDestination(dest);
738                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
739                store(command, isEnableJournalDiskSyncs() && true, null, null);
740                this.subscriptionCount.decrementAndGet();
741            }
742    
743            public SubscriptionInfo[] getAllSubscriptions() throws IOException {
744    
745                final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
746                indexLock.writeLock().lock();
747                try {
748                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
749                        public void execute(Transaction tx) throws IOException {
750                            StoredDestination sd = getStoredDestination(dest, tx);
751                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
752                                    .hasNext();) {
753                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
754                                SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
755                                        .getValue().getSubscriptionInfo().newInput()));
756                                subscriptions.add(info);
757    
758                            }
759                        }
760                    });
761                }finally {
762                    indexLock.writeLock().unlock();
763                }
764    
765                SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
766                subscriptions.toArray(rc);
767                return rc;
768            }
769    
770            public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
771                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
772                indexLock.writeLock().lock();
773                try {
774                    return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
775                        public SubscriptionInfo execute(Transaction tx) throws IOException {
776                            StoredDestination sd = getStoredDestination(dest, tx);
777                            KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
778                            if (command == null) {
779                                return null;
780                            }
781                            return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
782                                    .getSubscriptionInfo().newInput()));
783                        }
784                    });
785                }finally {
786                    indexLock.writeLock().unlock();
787                }
788            }
789    
790            public int getMessageCount(String clientId, String subscriptionName) throws IOException {
791                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
792                indexLock.writeLock().lock();
793                try {
794                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
795                        public Integer execute(Transaction tx) throws IOException {
796                            StoredDestination sd = getStoredDestination(dest, tx);
797                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
798                            if (cursorPos == null) {
799                                // The subscription might not exist.
800                                return 0;
801                            }
802    
803                            return (int) getStoredMessageCount(tx, sd, subscriptionKey);
804                        }
805                    });
806                }finally {
807                    indexLock.writeLock().unlock();
808                }
809            }
810    
811            public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
812                    throws Exception {
813                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
814                @SuppressWarnings("unused")
815                final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
816                indexLock.writeLock().lock();
817                try {
818                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
819                        public void execute(Transaction tx) throws Exception {
820                            StoredDestination sd = getStoredDestination(dest, tx);
821                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
822                            sd.orderIndex.setBatch(tx, cursorPos);
823                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
824                                    .hasNext();) {
825                                Entry<Long, MessageKeys> entry = iterator.next();
826                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
827                                    continue;
828                                }
829                                listener.recoverMessage(loadMessage(entry.getValue().location));
830                            }
831                            sd.orderIndex.resetCursorPosition();
832                        }
833                    });
834                }finally {
835                    indexLock.writeLock().unlock();
836                }
837            }
838    
839            public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
840                    final MessageRecoveryListener listener) throws Exception {
841                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
842                @SuppressWarnings("unused")
843                final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
844                indexLock.writeLock().lock();
845                try {
846                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
847                        public void execute(Transaction tx) throws Exception {
848                            StoredDestination sd = getStoredDestination(dest, tx);
849                            sd.orderIndex.resetCursorPosition();
850                            MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
851                            if (moc == null) {
852                                LastAck pos = getLastAck(tx, sd, subscriptionKey);
853                                if (pos == null) {
854                                    // sub deleted
855                                    return;
856                                }
857                                sd.orderIndex.setBatch(tx, pos);
858                                moc = sd.orderIndex.cursor;
859                            } else {
860                                sd.orderIndex.cursor.sync(moc);
861                            }
862    
863                            Entry<Long, MessageKeys> entry = null;
864                            int counter = 0;
865                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
866                                    .hasNext();) {
867                                entry = iterator.next();
868                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
869                                    continue;
870                                }
871                                if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
872                                    counter++;
873                                }
874                                if (counter >= maxReturned || listener.hasSpace() == false) {
875                                    break;
876                                }
877                            }
878                            sd.orderIndex.stoppedIterating();
879                            if (entry != null) {
880                                MessageOrderCursor copy = sd.orderIndex.cursor.copy();
881                                sd.subscriptionCursors.put(subscriptionKey, copy);
882                            }
883                        }
884                    });
885                }finally {
886                    indexLock.writeLock().unlock();
887                }
888            }
889    
890            public void resetBatching(String clientId, String subscriptionName) {
891                try {
892                    final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
893                    indexLock.writeLock().lock();
894                    try {
895                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
896                            public void execute(Transaction tx) throws IOException {
897                                StoredDestination sd = getStoredDestination(dest, tx);
898                                sd.subscriptionCursors.remove(subscriptionKey);
899                            }
900                        });
901                    }finally {
902                        indexLock.writeLock().unlock();
903                    }
904                } catch (IOException e) {
905                    throw new RuntimeException(e);
906                }
907            }
908        }
909    
910        String subscriptionKey(String clientId, String subscriptionName) {
911            return clientId + ":" + subscriptionName;
912        }
913    
914        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
915            return this.transactionStore.proxy(new KahaDBMessageStore(destination));
916        }
917    
918        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
919            return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
920        }
921    
922        /**
923         * Cleanup method to remove any state associated with the given destination.
924         * This method does not stop the message store (it might not be cached).
925         *
926         * @param destination
927         *            Destination to forget
928         */
929        public void removeQueueMessageStore(ActiveMQQueue destination) {
930        }
931    
932        /**
933         * Cleanup method to remove any state associated with the given destination
934         * This method does not stop the message store (it might not be cached).
935         *
936         * @param destination
937         *            Destination to forget
938         */
939        public void removeTopicMessageStore(ActiveMQTopic destination) {
940        }
941    
942        public void deleteAllMessages() throws IOException {
943            deleteAllMessages = true;
944        }
945    
946        public Set<ActiveMQDestination> getDestinations() {
947            try {
948                final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
949                indexLock.writeLock().lock();
950                try {
951                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
952                        public void execute(Transaction tx) throws IOException {
953                            for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
954                                    .hasNext();) {
955                                Entry<String, StoredDestination> entry = iterator.next();
956                                if (!isEmptyTopic(entry, tx)) {
957                                    rc.add(convert(entry.getKey()));
958                                }
959                            }
960                        }
961    
962                        private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
963                                throws IOException {
964                            boolean isEmptyTopic = false;
965                            ActiveMQDestination dest = convert(entry.getKey());
966                            if (dest.isTopic()) {
967                                StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
968                                if (loadedStore.subscriptionAcks.isEmpty(tx)) {
969                                    isEmptyTopic = true;
970                                }
971                            }
972                            return isEmptyTopic;
973                        }
974                    });
975                }finally {
976                    indexLock.writeLock().unlock();
977                }
978                return rc;
979            } catch (IOException e) {
980                throw new RuntimeException(e);
981            }
982        }
983    
984        public long getLastMessageBrokerSequenceId() throws IOException {
985            return 0;
986        }
987    
988        public long getLastProducerSequenceId(ProducerId id) {
989            indexLock.readLock().lock();
990            try {
991                return metadata.producerSequenceIdTracker.getLastSeqId(id);
992            } finally {
993                indexLock.readLock().unlock();
994            }
995        }
996    
997        public long size() {
998            try {
999                return journalSize.get() + getPageFile().getDiskSize();
1000            } catch (IOException e) {
1001                throw new RuntimeException(e);
1002            }
1003        }
1004    
1005        public void beginTransaction(ConnectionContext context) throws IOException {
1006            throw new IOException("Not yet implemented.");
1007        }
1008        public void commitTransaction(ConnectionContext context) throws IOException {
1009            throw new IOException("Not yet implemented.");
1010        }
1011        public void rollbackTransaction(ConnectionContext context) throws IOException {
1012            throw new IOException("Not yet implemented.");
1013        }
1014    
1015        public void checkpoint(boolean sync) throws IOException {
1016            super.checkpointCleanup(sync);
1017        }
1018    
1019        // /////////////////////////////////////////////////////////////////
1020        // Internal helper methods.
1021        // /////////////////////////////////////////////////////////////////
1022    
1023        /**
1024         * @param location
1025         * @return
1026         * @throws IOException
1027         */
1028        Message loadMessage(Location location) throws IOException {
1029            KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
1030            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1031            return msg;
1032        }
1033    
1034        // /////////////////////////////////////////////////////////////////
1035        // Internal conversion methods.
1036        // /////////////////////////////////////////////////////////////////
1037    
1038        KahaLocation convert(Location location) {
1039            KahaLocation rc = new KahaLocation();
1040            rc.setLogId(location.getDataFileId());
1041            rc.setOffset(location.getOffset());
1042            return rc;
1043        }
1044    
1045        KahaDestination convert(ActiveMQDestination dest) {
1046            KahaDestination rc = new KahaDestination();
1047            rc.setName(dest.getPhysicalName());
1048            switch (dest.getDestinationType()) {
1049            case ActiveMQDestination.QUEUE_TYPE:
1050                rc.setType(DestinationType.QUEUE);
1051                return rc;
1052            case ActiveMQDestination.TOPIC_TYPE:
1053                rc.setType(DestinationType.TOPIC);
1054                return rc;
1055            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1056                rc.setType(DestinationType.TEMP_QUEUE);
1057                return rc;
1058            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1059                rc.setType(DestinationType.TEMP_TOPIC);
1060                return rc;
1061            default:
1062                return null;
1063            }
1064        }
1065    
1066        ActiveMQDestination convert(String dest) {
1067            int p = dest.indexOf(":");
1068            if (p < 0) {
1069                throw new IllegalArgumentException("Not in the valid destination format");
1070            }
1071            int type = Integer.parseInt(dest.substring(0, p));
1072            String name = dest.substring(p + 1);
1073            return convert(type, name);
1074        }
1075    
1076        private ActiveMQDestination convert(KahaDestination commandDestination) {
1077            return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1078        }
1079    
1080        private ActiveMQDestination convert(int type, String name) {
1081            switch (KahaDestination.DestinationType.valueOf(type)) {
1082            case QUEUE:
1083                return new ActiveMQQueue(name);
1084            case TOPIC:
1085                return new ActiveMQTopic(name);
1086            case TEMP_QUEUE:
1087                return new ActiveMQTempQueue(name);
1088            case TEMP_TOPIC:
1089                return new ActiveMQTempTopic(name);
1090            default:
1091                throw new IllegalArgumentException("Not in the valid destination format");
1092            }
1093        }
1094    
1095        public TransactionIdTransformer getTransactionIdTransformer() {
1096            return transactionIdTransformer;
1097        }
1098    
1099        public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1100            this.transactionIdTransformer = transactionIdTransformer;
1101        }
1102    
1103        static class AsyncJobKey {
1104            MessageId id;
1105            ActiveMQDestination destination;
1106    
1107            AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1108                this.id = id;
1109                this.destination = destination;
1110            }
1111    
1112            @Override
1113            public boolean equals(Object obj) {
1114                if (obj == this) {
1115                    return true;
1116                }
1117                return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1118                        && destination.equals(((AsyncJobKey) obj).destination);
1119            }
1120    
1121            @Override
1122            public int hashCode() {
1123                return id.hashCode() + destination.hashCode();
1124            }
1125    
1126            @Override
1127            public String toString() {
1128                return destination.getPhysicalName() + "-" + id;
1129            }
1130        }
1131    
1132        public interface StoreTask {
1133            public boolean cancel();
1134    
1135            public void aquireLocks();
1136    
1137            public void releaseLocks();
1138        }
1139    
1140        class StoreQueueTask implements Runnable, StoreTask {
1141            protected final Message message;
1142            protected final ConnectionContext context;
1143            protected final KahaDBMessageStore store;
1144            protected final InnerFutureTask future;
1145            protected final AtomicBoolean done = new AtomicBoolean();
1146            protected final AtomicBoolean locked = new AtomicBoolean();
1147    
1148            public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1149                this.store = store;
1150                this.context = context;
1151                this.message = message;
1152                this.future = new InnerFutureTask(this);
1153            }
1154    
1155            public Future<Object> getFuture() {
1156                return this.future;
1157            }
1158    
1159            public boolean cancel() {
1160                if (this.done.compareAndSet(false, true)) {
1161                    return this.future.cancel(false);
1162                }
1163                return false;
1164            }
1165    
1166            public void aquireLocks() {
1167                if (this.locked.compareAndSet(false, true)) {
1168                    try {
1169                        globalQueueSemaphore.acquire();
1170                        store.acquireLocalAsyncLock();
1171                        message.incrementReferenceCount();
1172                    } catch (InterruptedException e) {
1173                        LOG.warn("Failed to aquire lock", e);
1174                    }
1175                }
1176    
1177            }
1178    
1179            public void releaseLocks() {
1180                if (this.locked.compareAndSet(true, false)) {
1181                    store.releaseLocalAsyncLock();
1182                    globalQueueSemaphore.release();
1183                    message.decrementReferenceCount();
1184                }
1185            }
1186    
1187            public void run() {
1188                this.store.doneTasks++;
1189                try {
1190                    if (this.done.compareAndSet(false, true)) {
1191                        this.store.addMessage(context, message);
1192                        removeQueueTask(this.store, this.message.getMessageId());
1193                        this.future.complete();
1194                    } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1195                        System.err.println(this.store.dest.getName() + " cancelled: "
1196                                + (this.store.canceledTasks / this.store.doneTasks) * 100);
1197                        this.store.canceledTasks = this.store.doneTasks = 0;
1198                    }
1199                } catch (Exception e) {
1200                    this.future.setException(e);
1201                }
1202            }
1203    
1204            protected Message getMessage() {
1205                return this.message;
1206            }
1207    
1208            private class InnerFutureTask extends FutureTask<Object> {
1209    
1210                public InnerFutureTask(Runnable runnable) {
1211                    super(runnable, null);
1212    
1213                }
1214    
1215                public void setException(final Exception e) {
1216                    super.setException(e);
1217                }
1218    
1219                public void complete() {
1220                    super.set(null);
1221                }
1222            }
1223        }
1224    
1225        class StoreTopicTask extends StoreQueueTask {
1226            private final int subscriptionCount;
1227            private final List<String> subscriptionKeys = new ArrayList<String>(1);
1228            private final KahaDBTopicMessageStore topicStore;
1229            public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1230                    int subscriptionCount) {
1231                super(store, context, message);
1232                this.topicStore = store;
1233                this.subscriptionCount = subscriptionCount;
1234    
1235            }
1236    
1237            @Override
1238            public void aquireLocks() {
1239                if (this.locked.compareAndSet(false, true)) {
1240                    try {
1241                        globalTopicSemaphore.acquire();
1242                        store.acquireLocalAsyncLock();
1243                        message.incrementReferenceCount();
1244                    } catch (InterruptedException e) {
1245                        LOG.warn("Failed to aquire lock", e);
1246                    }
1247                }
1248    
1249            }
1250    
1251            @Override
1252            public void releaseLocks() {
1253                if (this.locked.compareAndSet(true, false)) {
1254                    message.decrementReferenceCount();
1255                    store.releaseLocalAsyncLock();
1256                    globalTopicSemaphore.release();
1257                }
1258            }
1259    
1260            /**
1261             * add a key
1262             *
1263             * @param key
1264             * @return true if all acknowledgements received
1265             */
1266            public boolean addSubscriptionKey(String key) {
1267                synchronized (this.subscriptionKeys) {
1268                    this.subscriptionKeys.add(key);
1269                }
1270                return this.subscriptionKeys.size() >= this.subscriptionCount;
1271            }
1272    
1273            @Override
1274            public void run() {
1275                this.store.doneTasks++;
1276                try {
1277                    if (this.done.compareAndSet(false, true)) {
1278                        this.topicStore.addMessage(context, message);
1279                        // apply any acks we have
1280                        synchronized (this.subscriptionKeys) {
1281                            for (String key : this.subscriptionKeys) {
1282                                this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1283    
1284                            }
1285                        }
1286                        removeTopicTask(this.topicStore, this.message.getMessageId());
1287                        this.future.complete();
1288                    } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1289                        System.err.println(this.store.dest.getName() + " cancelled: "
1290                                + (this.store.canceledTasks / this.store.doneTasks) * 100);
1291                        this.store.canceledTasks = this.store.doneTasks = 0;
1292                    }
1293                } catch (Exception e) {
1294                    this.future.setException(e);
1295                }
1296            }
1297        }
1298    
1299        public class StoreTaskExecutor extends ThreadPoolExecutor {
1300    
1301            public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1302                super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1303            }
1304    
1305            protected void afterExecute(Runnable runnable, Throwable throwable) {
1306                super.afterExecute(runnable, throwable);
1307    
1308                if (runnable instanceof StoreTask) {
1309                   ((StoreTask)runnable).releaseLocks();
1310                }
1311    
1312            }
1313        }
1314    }