001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.util.ArrayList;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.Iterator;
026    import java.util.LinkedList;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Map.Entry;
030    import java.util.Set;
031    import java.util.concurrent.BlockingQueue;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.Future;
034    import java.util.concurrent.FutureTask;
035    import java.util.concurrent.LinkedBlockingQueue;
036    import java.util.concurrent.Semaphore;
037    import java.util.concurrent.ThreadFactory;
038    import java.util.concurrent.ThreadPoolExecutor;
039    import java.util.concurrent.TimeUnit;
040    import java.util.concurrent.atomic.AtomicBoolean;
041    import java.util.concurrent.atomic.AtomicInteger;
042    
043    import org.apache.activemq.broker.ConnectionContext;
044    import org.apache.activemq.broker.region.Destination;
045    import org.apache.activemq.broker.region.RegionBroker;
046    import org.apache.activemq.command.ActiveMQDestination;
047    import org.apache.activemq.command.ActiveMQQueue;
048    import org.apache.activemq.command.ActiveMQTempQueue;
049    import org.apache.activemq.command.ActiveMQTempTopic;
050    import org.apache.activemq.command.ActiveMQTopic;
051    import org.apache.activemq.command.Message;
052    import org.apache.activemq.command.MessageAck;
053    import org.apache.activemq.command.MessageId;
054    import org.apache.activemq.command.ProducerId;
055    import org.apache.activemq.command.SubscriptionInfo;
056    import org.apache.activemq.command.TransactionId;
057    import org.apache.activemq.openwire.OpenWireFormat;
058    import org.apache.activemq.protobuf.Buffer;
059    import org.apache.activemq.store.*;
060    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
061    import org.apache.activemq.store.kahadb.data.KahaDestination;
062    import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
063    import org.apache.activemq.store.kahadb.data.KahaLocation;
064    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
065    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
066    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
067    import org.apache.activemq.store.kahadb.disk.journal.Location;
068    import org.apache.activemq.store.kahadb.disk.page.Transaction;
069    import org.apache.activemq.usage.MemoryUsage;
070    import org.apache.activemq.usage.SystemUsage;
071    import org.apache.activemq.util.ServiceStopper;
072    import org.apache.activemq.util.ThreadPoolUtils;
073    import org.apache.activemq.wireformat.WireFormat;
074    import org.slf4j.Logger;
075    import org.slf4j.LoggerFactory;
076    
077    public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
078        static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
079        private static final int MAX_ASYNC_JOBS = 10000;
080    
081        public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
082        public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
083                PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
084        public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
085        private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
086                PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
087    
088        protected ExecutorService queueExecutor;
089        protected ExecutorService topicExecutor;
090        protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
091        protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
092        final WireFormat wireFormat = new OpenWireFormat();
093        private SystemUsage usageManager;
094        private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
095        private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
096        Semaphore globalQueueSemaphore;
097        Semaphore globalTopicSemaphore;
098        private boolean concurrentStoreAndDispatchQueues = true;
099        // when true, message order may be compromised when cache is exhausted if store is out
100        // or order w.r.t cache
101        private boolean concurrentStoreAndDispatchTopics = false;
102        private final boolean concurrentStoreAndDispatchTransactions = false;
103        private int maxAsyncJobs = MAX_ASYNC_JOBS;
104        private final KahaDBTransactionStore transactionStore;
105        private TransactionIdTransformer transactionIdTransformer;
106    
107        public KahaDBStore() {
108            this.transactionStore = new KahaDBTransactionStore(this);
109            this.transactionIdTransformer = new TransactionIdTransformer() {
110                @Override
111                public TransactionId transform(TransactionId txid) {
112                    return txid;
113                }
114            };
115        }
116    
117        @Override
118        public String toString() {
119            return "KahaDB:[" + directory.getAbsolutePath() + "]";
120        }
121    
122        @Override
123        public void setBrokerName(String brokerName) {
124        }
125    
126        @Override
127        public void setUsageManager(SystemUsage usageManager) {
128            this.usageManager = usageManager;
129        }
130    
131        public SystemUsage getUsageManager() {
132            return this.usageManager;
133        }
134    
135        /**
136         * @return the concurrentStoreAndDispatch
137         */
138        public boolean isConcurrentStoreAndDispatchQueues() {
139            return this.concurrentStoreAndDispatchQueues;
140        }
141    
142        /**
143         * @param concurrentStoreAndDispatch
144         *            the concurrentStoreAndDispatch to set
145         */
146        public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
147            this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
148        }
149    
150        /**
151         * @return the concurrentStoreAndDispatch
152         */
153        public boolean isConcurrentStoreAndDispatchTopics() {
154            return this.concurrentStoreAndDispatchTopics;
155        }
156    
157        /**
158         * @param concurrentStoreAndDispatch
159         *            the concurrentStoreAndDispatch to set
160         */
161        public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
162            this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
163        }
164    
165        public boolean isConcurrentStoreAndDispatchTransactions() {
166            return this.concurrentStoreAndDispatchTransactions;
167        }
168    
169        /**
170         * @return the maxAsyncJobs
171         */
172        public int getMaxAsyncJobs() {
173            return this.maxAsyncJobs;
174        }
175        /**
176         * @param maxAsyncJobs
177         *            the maxAsyncJobs to set
178         */
179        public void setMaxAsyncJobs(int maxAsyncJobs) {
180            this.maxAsyncJobs = maxAsyncJobs;
181        }
182    
183        @Override
184        public void doStart() throws Exception {
185            if (brokerService != null) {
186                metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
187                wireFormat.setVersion(metadata.openwireVersion);
188    
189                if (LOG.isDebugEnabled()) {
190                    LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
191                }
192    
193            }
194            super.doStart();
195    
196            if (brokerService != null) {
197                // In case the recovered store used a different OpenWire version log a warning
198                // to assist in determining why journal reads fail.
199                if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
200                    LOG.warn("Receovered Store uses a different OpenWire version[{}] " +
201                             "than the version configured[{}].",
202                             metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
203                }
204            }
205    
206            this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
207            this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
208            this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
209            this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
210            this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
211                asyncQueueJobQueue, new ThreadFactory() {
212                    @Override
213                    public Thread newThread(Runnable runnable) {
214                        Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
215                        thread.setDaemon(true);
216                        return thread;
217                    }
218                });
219            this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
220                asyncTopicJobQueue, new ThreadFactory() {
221                    @Override
222                    public Thread newThread(Runnable runnable) {
223                        Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
224                        thread.setDaemon(true);
225                        return thread;
226                    }
227                });
228        }
229    
230        @Override
231        public void doStop(ServiceStopper stopper) throws Exception {
232            // drain down async jobs
233            LOG.info("Stopping async queue tasks");
234            if (this.globalQueueSemaphore != null) {
235                this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
236            }
237            synchronized (this.asyncQueueMaps) {
238                for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
239                    synchronized (m) {
240                        for (StoreTask task : m.values()) {
241                            task.cancel();
242                        }
243                    }
244                }
245                this.asyncQueueMaps.clear();
246            }
247            LOG.info("Stopping async topic tasks");
248            if (this.globalTopicSemaphore != null) {
249                this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
250            }
251            synchronized (this.asyncTopicMaps) {
252                for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
253                    synchronized (m) {
254                        for (StoreTask task : m.values()) {
255                            task.cancel();
256                        }
257                    }
258                }
259                this.asyncTopicMaps.clear();
260            }
261            if (this.globalQueueSemaphore != null) {
262                this.globalQueueSemaphore.drainPermits();
263            }
264            if (this.globalTopicSemaphore != null) {
265                this.globalTopicSemaphore.drainPermits();
266            }
267            if (this.queueExecutor != null) {
268                ThreadPoolUtils.shutdownNow(queueExecutor);
269                queueExecutor = null;
270            }
271            if (this.topicExecutor != null) {
272                ThreadPoolUtils.shutdownNow(topicExecutor);
273                topicExecutor = null;
274            }
275            LOG.info("Stopped KahaDB");
276            super.doStop(stopper);
277        }
278    
279        @Override
280        void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException {
281            Location location;
282            this.indexLock.writeLock().lock();
283            try {
284                  location = findMessageLocation(key, destination);
285            } finally {
286                this.indexLock.writeLock().unlock();
287            }
288    
289            if (location != null) {
290                KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
291                Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
292    
293                message.incrementRedeliveryCounter();
294                if (LOG.isTraceEnabled()) {
295                    LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter());
296                }
297                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
298                addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
299    
300                final Location rewriteLocation = journal.write(toByteSequence(addMessage), true);
301    
302                this.indexLock.writeLock().lock();
303                try {
304                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
305                        @Override
306                        public void execute(Transaction tx) throws IOException {
307                            StoredDestination sd = getStoredDestination(destination, tx);
308                            Long sequence = sd.messageIdIndex.get(tx, key);
309                            MessageKeys keys = sd.orderIndex.get(tx, sequence);
310                            sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation));
311                        }
312                    });
313                } finally {
314                    this.indexLock.writeLock().unlock();
315                }
316            }
317        }
318    
319        @Override
320        void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
321            if (brokerService != null) {
322                RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
323                if (regionBroker != null) {
324                    Set<Destination> destinationSet = regionBroker.getDestinations(convert(commandDestination));
325                    for (Destination destination : destinationSet) {
326                        destination.getDestinationStatistics().getMessages().decrement();
327                        destination.getDestinationStatistics().getEnqueues().decrement();
328                    }
329                }
330            }
331        }
332    
333        private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
334            return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
335                @Override
336                public Location execute(Transaction tx) throws IOException {
337                    StoredDestination sd = getStoredDestination(destination, tx);
338                    Long sequence = sd.messageIdIndex.get(tx, key);
339                    if (sequence == null) {
340                        return null;
341                    }
342                    return sd.orderIndex.get(tx, sequence).location;
343                }
344            });
345        }
346    
347        protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
348            StoreQueueTask task = null;
349            synchronized (store.asyncTaskMap) {
350                task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
351            }
352            return task;
353        }
354    
355        protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
356            synchronized (store.asyncTaskMap) {
357                store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
358            }
359            this.queueExecutor.execute(task);
360        }
361    
362        protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
363            StoreTopicTask task = null;
364            synchronized (store.asyncTaskMap) {
365                task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
366            }
367            return task;
368        }
369    
370        protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
371            synchronized (store.asyncTaskMap) {
372                store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
373            }
374            this.topicExecutor.execute(task);
375        }
376    
377        @Override
378        public TransactionStore createTransactionStore() throws IOException {
379            return this.transactionStore;
380        }
381    
382        public boolean getForceRecoverIndex() {
383            return this.forceRecoverIndex;
384        }
385    
386        public void setForceRecoverIndex(boolean forceRecoverIndex) {
387            this.forceRecoverIndex = forceRecoverIndex;
388        }
389    
390        public class KahaDBMessageStore extends AbstractMessageStore {
391            protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
392            protected KahaDestination dest;
393            private final int maxAsyncJobs;
394            private final Semaphore localDestinationSemaphore;
395    
396            double doneTasks, canceledTasks = 0;
397    
398            public KahaDBMessageStore(ActiveMQDestination destination) {
399                super(destination);
400                this.dest = convert(destination);
401                this.maxAsyncJobs = getMaxAsyncJobs();
402                this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
403            }
404    
405            @Override
406            public ActiveMQDestination getDestination() {
407                return destination;
408            }
409    
410            @Override
411            public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
412                    throws IOException {
413                if (isConcurrentStoreAndDispatchQueues()) {
414                    StoreQueueTask result = new StoreQueueTask(this, context, message);
415                    result.aquireLocks();
416                    addQueueTask(this, result);
417                    return result.getFuture();
418                } else {
419                    return super.asyncAddQueueMessage(context, message);
420                }
421            }
422    
423            @Override
424            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
425                if (isConcurrentStoreAndDispatchQueues()) {
426                    AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
427                    StoreQueueTask task = null;
428                    synchronized (asyncTaskMap) {
429                        task = (StoreQueueTask) asyncTaskMap.get(key);
430                    }
431                    if (task != null) {
432                        if (ack.isInTransaction() || !task.cancel()) {
433                            try {
434                                task.future.get();
435                            } catch (InterruptedException e) {
436                                throw new InterruptedIOException(e.toString());
437                            } catch (Exception ignored) {
438                                LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
439                            }
440                            removeMessage(context, ack);
441                        } else {
442                            synchronized (asyncTaskMap) {
443                                asyncTaskMap.remove(key);
444                            }
445                        }
446                    } else {
447                        removeMessage(context, ack);
448                    }
449                } else {
450                    removeMessage(context, ack);
451                }
452            }
453    
454            @Override
455            public void addMessage(ConnectionContext context, Message message) throws IOException {
456                KahaAddMessageCommand command = new KahaAddMessageCommand();
457                command.setDestination(dest);
458                command.setMessageId(message.getMessageId().toProducerKey());
459                command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
460                command.setPriority(message.getPriority());
461                command.setPrioritySupported(isPrioritizedMessages());
462                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
463                command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
464                store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
465    
466            }
467    
468            @Override
469            public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
470                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
471                command.setDestination(dest);
472                command.setMessageId(ack.getLastMessageId().toProducerKey());
473                command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
474    
475                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
476                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
477                store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
478            }
479    
480            @Override
481            public void removeAllMessages(ConnectionContext context) throws IOException {
482                KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
483                command.setDestination(dest);
484                store(command, true, null, null);
485            }
486    
487            @Override
488            public Message getMessage(MessageId identity) throws IOException {
489                final String key = identity.toProducerKey();
490    
491                // Hopefully one day the page file supports concurrent read
492                // operations... but for now we must
493                // externally synchronize...
494                Location location;
495                indexLock.writeLock().lock();
496                try {
497                    location = findMessageLocation(key, dest);
498                }finally {
499                    indexLock.writeLock().unlock();
500                }
501                if (location == null) {
502                    return null;
503                }
504    
505                return loadMessage(location);
506            }
507    
508            @Override
509            public int getMessageCount() throws IOException {
510                try {
511                    lockAsyncJobQueue();
512                    indexLock.writeLock().lock();
513                    try {
514                        return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
515                            @Override
516                            public Integer execute(Transaction tx) throws IOException {
517                                // Iterate through all index entries to get a count
518                                // of
519                                // messages in the destination.
520                                StoredDestination sd = getStoredDestination(dest, tx);
521                                int rc = 0;
522                                for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
523                                        .hasNext();) {
524                                    iterator.next();
525                                    rc++;
526                                }
527                                return rc;
528                            }
529                        });
530                    }finally {
531                        indexLock.writeLock().unlock();
532                    }
533                } finally {
534                    unlockAsyncJobQueue();
535                }
536            }
537    
538            @Override
539            public boolean isEmpty() throws IOException {
540                indexLock.writeLock().lock();
541                try {
542                    return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
543                        @Override
544                        public Boolean execute(Transaction tx) throws IOException {
545                            // Iterate through all index entries to get a count of
546                            // messages in the destination.
547                            StoredDestination sd = getStoredDestination(dest, tx);
548                            return sd.locationIndex.isEmpty(tx);
549                        }
550                    });
551                }finally {
552                    indexLock.writeLock().unlock();
553                }
554            }
555    
556            @Override
557            public void recover(final MessageRecoveryListener listener) throws Exception {
558                // recovery may involve expiry which will modify
559                indexLock.writeLock().lock();
560                try {
561                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
562                        @Override
563                        public void execute(Transaction tx) throws Exception {
564                            StoredDestination sd = getStoredDestination(dest, tx);
565                            sd.orderIndex.resetCursorPosition();
566                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
567                                    .hasNext(); ) {
568                                Entry<Long, MessageKeys> entry = iterator.next();
569                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
570                                    continue;
571                                }
572                                Message msg = loadMessage(entry.getValue().location);
573                                listener.recoverMessage(msg);
574                            }
575                        }
576                    });
577                }finally {
578                    indexLock.writeLock().unlock();
579                }
580            }
581    
582    
583            @Override
584            public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
585                indexLock.writeLock().lock();
586                try {
587                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
588                        @Override
589                        public void execute(Transaction tx) throws Exception {
590                            StoredDestination sd = getStoredDestination(dest, tx);
591                            Entry<Long, MessageKeys> entry = null;
592                            int counter = 0;
593                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
594                                entry = iterator.next();
595                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
596                                    continue;
597                                }
598                                Message msg = loadMessage(entry.getValue().location);
599                                listener.recoverMessage(msg);
600                                counter++;
601                                if (counter >= maxReturned) {
602                                    break;
603                                }
604                            }
605                            sd.orderIndex.stoppedIterating();
606                        }
607                    });
608                }finally {
609                    indexLock.writeLock().unlock();
610                }
611            }
612    
613            @Override
614            public void resetBatching() {
615                if (pageFile.isLoaded()) {
616                    indexLock.writeLock().lock();
617                    try {
618                        pageFile.tx().execute(new Transaction.Closure<Exception>() {
619                            @Override
620                            public void execute(Transaction tx) throws Exception {
621                                StoredDestination sd = getExistingStoredDestination(dest, tx);
622                                if (sd != null) {
623                                    sd.orderIndex.resetCursorPosition();}
624                                }
625                            });
626                    } catch (Exception e) {
627                        LOG.error("Failed to reset batching",e);
628                    }finally {
629                        indexLock.writeLock().unlock();
630                    }
631                }
632            }
633    
634            @Override
635            public void setBatch(MessageId identity) throws IOException {
636                try {
637                    final String key = identity.toProducerKey();
638                    lockAsyncJobQueue();
639    
640                    // Hopefully one day the page file supports concurrent read
641                    // operations... but for now we must
642                    // externally synchronize...
643    
644                    indexLock.writeLock().lock();
645                    try {
646                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
647                            @Override
648                            public void execute(Transaction tx) throws IOException {
649                                StoredDestination sd = getStoredDestination(dest, tx);
650                                Long location = sd.messageIdIndex.get(tx, key);
651                                if (location != null) {
652                                    sd.orderIndex.setBatch(tx, location);
653                                }
654                            }
655                        });
656                    } finally {
657                        indexLock.writeLock().unlock();
658                    }
659                } finally {
660                    unlockAsyncJobQueue();
661                }
662            }
663    
664            @Override
665            public void setMemoryUsage(MemoryUsage memoeyUSage) {
666            }
667            @Override
668            public void start() throws Exception {
669                super.start();
670            }
671            @Override
672            public void stop() throws Exception {
673                super.stop();
674            }
675    
676            protected void lockAsyncJobQueue() {
677                try {
678                    this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
679                } catch (Exception e) {
680                    LOG.error("Failed to lock async jobs for " + this.destination, e);
681                }
682            }
683    
684            protected void unlockAsyncJobQueue() {
685                this.localDestinationSemaphore.release(this.maxAsyncJobs);
686            }
687    
688            protected void acquireLocalAsyncLock() {
689                try {
690                    this.localDestinationSemaphore.acquire();
691                } catch (InterruptedException e) {
692                    LOG.error("Failed to aquire async lock for " + this.destination, e);
693                }
694            }
695    
696            protected void releaseLocalAsyncLock() {
697                this.localDestinationSemaphore.release();
698            }
699    
700        }
701    
702        class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
703            private final AtomicInteger subscriptionCount = new AtomicInteger();
704            public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
705                super(destination);
706                this.subscriptionCount.set(getAllSubscriptions().length);
707                asyncTopicMaps.add(asyncTaskMap);
708            }
709    
710            @Override
711            public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
712                    throws IOException {
713                if (isConcurrentStoreAndDispatchTopics()) {
714                    StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
715                    result.aquireLocks();
716                    addTopicTask(this, result);
717                    return result.getFuture();
718                } else {
719                    return super.asyncAddTopicMessage(context, message);
720                }
721            }
722    
723            @Override
724            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
725                                    MessageId messageId, MessageAck ack)
726                    throws IOException {
727                String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
728                if (isConcurrentStoreAndDispatchTopics()) {
729                    AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
730                    StoreTopicTask task = null;
731                    synchronized (asyncTaskMap) {
732                        task = (StoreTopicTask) asyncTaskMap.get(key);
733                    }
734                    if (task != null) {
735                        if (task.addSubscriptionKey(subscriptionKey)) {
736                            removeTopicTask(this, messageId);
737                            if (task.cancel()) {
738                                synchronized (asyncTaskMap) {
739                                    asyncTaskMap.remove(key);
740                                }
741                            }
742                        }
743                    } else {
744                        doAcknowledge(context, subscriptionKey, messageId, ack);
745                    }
746                } else {
747                    doAcknowledge(context, subscriptionKey, messageId, ack);
748                }
749            }
750    
751            protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
752                    throws IOException {
753                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
754                command.setDestination(dest);
755                command.setSubscriptionKey(subscriptionKey);
756                command.setMessageId(messageId.toProducerKey());
757                command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
758                if (ack != null && ack.isUnmatchedAck()) {
759                    command.setAck(UNMATCHED);
760                } else {
761                    org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
762                    command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
763                }
764                store(command, false, null, null);
765            }
766    
767            @Override
768            public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
769                String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
770                        .getSubscriptionName());
771                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
772                command.setDestination(dest);
773                command.setSubscriptionKey(subscriptionKey.toString());
774                command.setRetroactive(retroactive);
775                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
776                command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
777                store(command, isEnableJournalDiskSyncs() && true, null, null);
778                this.subscriptionCount.incrementAndGet();
779            }
780    
781            @Override
782            public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
783                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
784                command.setDestination(dest);
785                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
786                store(command, isEnableJournalDiskSyncs() && true, null, null);
787                this.subscriptionCount.decrementAndGet();
788            }
789    
790            @Override
791            public SubscriptionInfo[] getAllSubscriptions() throws IOException {
792    
793                final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
794                indexLock.writeLock().lock();
795                try {
796                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
797                        @Override
798                        public void execute(Transaction tx) throws IOException {
799                            StoredDestination sd = getStoredDestination(dest, tx);
800                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
801                                    .hasNext();) {
802                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
803                                SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
804                                        .getValue().getSubscriptionInfo().newInput()));
805                                subscriptions.add(info);
806    
807                            }
808                        }
809                    });
810                }finally {
811                    indexLock.writeLock().unlock();
812                }
813    
814                SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
815                subscriptions.toArray(rc);
816                return rc;
817            }
818    
819            @Override
820            public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
821                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
822                indexLock.writeLock().lock();
823                try {
824                    return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
825                        @Override
826                        public SubscriptionInfo execute(Transaction tx) throws IOException {
827                            StoredDestination sd = getStoredDestination(dest, tx);
828                            KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
829                            if (command == null) {
830                                return null;
831                            }
832                            return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
833                                    .getSubscriptionInfo().newInput()));
834                        }
835                    });
836                }finally {
837                    indexLock.writeLock().unlock();
838                }
839            }
840    
841            @Override
842            public int getMessageCount(String clientId, String subscriptionName) throws IOException {
843                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
844                indexLock.writeLock().lock();
845                try {
846                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
847                        @Override
848                        public Integer execute(Transaction tx) throws IOException {
849                            StoredDestination sd = getStoredDestination(dest, tx);
850                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
851                            if (cursorPos == null) {
852                                // The subscription might not exist.
853                                return 0;
854                            }
855    
856                            return (int) getStoredMessageCount(tx, sd, subscriptionKey);
857                        }
858                    });
859                }finally {
860                    indexLock.writeLock().unlock();
861                }
862            }
863    
864            @Override
865            public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
866                    throws Exception {
867                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
868                @SuppressWarnings("unused")
869                final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
870                indexLock.writeLock().lock();
871                try {
872                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
873                        @Override
874                        public void execute(Transaction tx) throws Exception {
875                            StoredDestination sd = getStoredDestination(dest, tx);
876                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
877                            sd.orderIndex.setBatch(tx, cursorPos);
878                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
879                                    .hasNext();) {
880                                Entry<Long, MessageKeys> entry = iterator.next();
881                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
882                                    continue;
883                                }
884                                listener.recoverMessage(loadMessage(entry.getValue().location));
885                            }
886                            sd.orderIndex.resetCursorPosition();
887                        }
888                    });
889                }finally {
890                    indexLock.writeLock().unlock();
891                }
892            }
893    
894            @Override
895            public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
896                    final MessageRecoveryListener listener) throws Exception {
897                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
898                @SuppressWarnings("unused")
899                final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
900                indexLock.writeLock().lock();
901                try {
902                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
903                        @Override
904                        public void execute(Transaction tx) throws Exception {
905                            StoredDestination sd = getStoredDestination(dest, tx);
906                            sd.orderIndex.resetCursorPosition();
907                            MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
908                            if (moc == null) {
909                                LastAck pos = getLastAck(tx, sd, subscriptionKey);
910                                if (pos == null) {
911                                    // sub deleted
912                                    return;
913                                }
914                                sd.orderIndex.setBatch(tx, pos);
915                                moc = sd.orderIndex.cursor;
916                            } else {
917                                sd.orderIndex.cursor.sync(moc);
918                            }
919    
920                            Entry<Long, MessageKeys> entry = null;
921                            int counter = 0;
922                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
923                                    .hasNext();) {
924                                entry = iterator.next();
925                                if (ackedAndPrepared.contains(entry.getValue().messageId)) {
926                                    continue;
927                                }
928                                if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
929                                    counter++;
930                                }
931                                if (counter >= maxReturned || listener.hasSpace() == false) {
932                                    break;
933                                }
934                            }
935                            sd.orderIndex.stoppedIterating();
936                            if (entry != null) {
937                                MessageOrderCursor copy = sd.orderIndex.cursor.copy();
938                                sd.subscriptionCursors.put(subscriptionKey, copy);
939                            }
940                        }
941                    });
942                }finally {
943                    indexLock.writeLock().unlock();
944                }
945            }
946    
947            @Override
948            public void resetBatching(String clientId, String subscriptionName) {
949                try {
950                    final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
951                    indexLock.writeLock().lock();
952                    try {
953                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
954                            @Override
955                            public void execute(Transaction tx) throws IOException {
956                                StoredDestination sd = getStoredDestination(dest, tx);
957                                sd.subscriptionCursors.remove(subscriptionKey);
958                            }
959                        });
960                    }finally {
961                        indexLock.writeLock().unlock();
962                    }
963                } catch (IOException e) {
964                    throw new RuntimeException(e);
965                }
966            }
967        }
968    
969        String subscriptionKey(String clientId, String subscriptionName) {
970            return clientId + ":" + subscriptionName;
971        }
972    
973        @Override
974        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
975            return this.transactionStore.proxy(new KahaDBMessageStore(destination));
976        }
977    
978        @Override
979        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
980            return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
981        }
982    
983        /**
984         * Cleanup method to remove any state associated with the given destination.
985         * This method does not stop the message store (it might not be cached).
986         *
987         * @param destination
988         *            Destination to forget
989         */
990        @Override
991        public void removeQueueMessageStore(ActiveMQQueue destination) {
992        }
993    
994        /**
995         * Cleanup method to remove any state associated with the given destination
996         * This method does not stop the message store (it might not be cached).
997         *
998         * @param destination
999         *            Destination to forget
1000         */
1001        @Override
1002        public void removeTopicMessageStore(ActiveMQTopic destination) {
1003        }
1004    
1005        @Override
1006        public void deleteAllMessages() throws IOException {
1007            deleteAllMessages = true;
1008        }
1009    
1010        @Override
1011        public Set<ActiveMQDestination> getDestinations() {
1012            try {
1013                final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1014                indexLock.writeLock().lock();
1015                try {
1016                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1017                        @Override
1018                        public void execute(Transaction tx) throws IOException {
1019                            for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1020                                    .hasNext();) {
1021                                Entry<String, StoredDestination> entry = iterator.next();
1022                                if (!isEmptyTopic(entry, tx)) {
1023                                    rc.add(convert(entry.getKey()));
1024                                }
1025                            }
1026                        }
1027    
1028                        private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
1029                                throws IOException {
1030                            boolean isEmptyTopic = false;
1031                            ActiveMQDestination dest = convert(entry.getKey());
1032                            if (dest.isTopic()) {
1033                                StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
1034                                if (loadedStore.subscriptionAcks.isEmpty(tx)) {
1035                                    isEmptyTopic = true;
1036                                }
1037                            }
1038                            return isEmptyTopic;
1039                        }
1040                    });
1041                }finally {
1042                    indexLock.writeLock().unlock();
1043                }
1044                return rc;
1045            } catch (IOException e) {
1046                throw new RuntimeException(e);
1047            }
1048        }
1049    
1050        @Override
1051        public long getLastMessageBrokerSequenceId() throws IOException {
1052            return 0;
1053        }
1054    
1055        @Override
1056        public long getLastProducerSequenceId(ProducerId id) {
1057            indexLock.readLock().lock();
1058            try {
1059                return metadata.producerSequenceIdTracker.getLastSeqId(id);
1060            } finally {
1061                indexLock.readLock().unlock();
1062            }
1063        }
1064    
1065        @Override
1066        public long size() {
1067            try {
1068                return journalSize.get() + getPageFile().getDiskSize();
1069            } catch (IOException e) {
1070                throw new RuntimeException(e);
1071            }
1072        }
1073    
1074        @Override
1075        public void beginTransaction(ConnectionContext context) throws IOException {
1076            throw new IOException("Not yet implemented.");
1077        }
1078        @Override
1079        public void commitTransaction(ConnectionContext context) throws IOException {
1080            throw new IOException("Not yet implemented.");
1081        }
1082        @Override
1083        public void rollbackTransaction(ConnectionContext context) throws IOException {
1084            throw new IOException("Not yet implemented.");
1085        }
1086    
1087        @Override
1088        public void checkpoint(boolean sync) throws IOException {
1089            super.checkpointCleanup(sync);
1090        }
1091    
1092        // /////////////////////////////////////////////////////////////////
1093        // Internal helper methods.
1094        // /////////////////////////////////////////////////////////////////
1095    
1096        /**
1097         * @param location
1098         * @return
1099         * @throws IOException
1100         */
1101        Message loadMessage(Location location) throws IOException {
1102            KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
1103            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1104            return msg;
1105        }
1106    
1107        // /////////////////////////////////////////////////////////////////
1108        // Internal conversion methods.
1109        // /////////////////////////////////////////////////////////////////
1110    
1111        KahaLocation convert(Location location) {
1112            KahaLocation rc = new KahaLocation();
1113            rc.setLogId(location.getDataFileId());
1114            rc.setOffset(location.getOffset());
1115            return rc;
1116        }
1117    
1118        KahaDestination convert(ActiveMQDestination dest) {
1119            KahaDestination rc = new KahaDestination();
1120            rc.setName(dest.getPhysicalName());
1121            switch (dest.getDestinationType()) {
1122            case ActiveMQDestination.QUEUE_TYPE:
1123                rc.setType(DestinationType.QUEUE);
1124                return rc;
1125            case ActiveMQDestination.TOPIC_TYPE:
1126                rc.setType(DestinationType.TOPIC);
1127                return rc;
1128            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1129                rc.setType(DestinationType.TEMP_QUEUE);
1130                return rc;
1131            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1132                rc.setType(DestinationType.TEMP_TOPIC);
1133                return rc;
1134            default:
1135                return null;
1136            }
1137        }
1138    
1139        ActiveMQDestination convert(String dest) {
1140            int p = dest.indexOf(":");
1141            if (p < 0) {
1142                throw new IllegalArgumentException("Not in the valid destination format");
1143            }
1144            int type = Integer.parseInt(dest.substring(0, p));
1145            String name = dest.substring(p + 1);
1146            return convert(type, name);
1147        }
1148    
1149        private ActiveMQDestination convert(KahaDestination commandDestination) {
1150            return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1151        }
1152    
1153        private ActiveMQDestination convert(int type, String name) {
1154            switch (KahaDestination.DestinationType.valueOf(type)) {
1155            case QUEUE:
1156                return new ActiveMQQueue(name);
1157            case TOPIC:
1158                return new ActiveMQTopic(name);
1159            case TEMP_QUEUE:
1160                return new ActiveMQTempQueue(name);
1161            case TEMP_TOPIC:
1162                return new ActiveMQTempTopic(name);
1163            default:
1164                throw new IllegalArgumentException("Not in the valid destination format");
1165            }
1166        }
1167    
1168        public TransactionIdTransformer getTransactionIdTransformer() {
1169            return transactionIdTransformer;
1170        }
1171    
1172        public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1173            this.transactionIdTransformer = transactionIdTransformer;
1174        }
1175    
1176        static class AsyncJobKey {
1177            MessageId id;
1178            ActiveMQDestination destination;
1179    
1180            AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1181                this.id = id;
1182                this.destination = destination;
1183            }
1184    
1185            @Override
1186            public boolean equals(Object obj) {
1187                if (obj == this) {
1188                    return true;
1189                }
1190                return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1191                        && destination.equals(((AsyncJobKey) obj).destination);
1192            }
1193    
1194            @Override
1195            public int hashCode() {
1196                return id.hashCode() + destination.hashCode();
1197            }
1198    
1199            @Override
1200            public String toString() {
1201                return destination.getPhysicalName() + "-" + id;
1202            }
1203        }
1204    
1205        public interface StoreTask {
1206            public boolean cancel();
1207    
1208            public void aquireLocks();
1209    
1210            public void releaseLocks();
1211        }
1212    
1213        class StoreQueueTask implements Runnable, StoreTask {
1214            protected final Message message;
1215            protected final ConnectionContext context;
1216            protected final KahaDBMessageStore store;
1217            protected final InnerFutureTask future;
1218            protected final AtomicBoolean done = new AtomicBoolean();
1219            protected final AtomicBoolean locked = new AtomicBoolean();
1220    
1221            public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1222                this.store = store;
1223                this.context = context;
1224                this.message = message;
1225                this.future = new InnerFutureTask(this);
1226            }
1227    
1228            public Future<Object> getFuture() {
1229                return this.future;
1230            }
1231    
1232            @Override
1233            public boolean cancel() {
1234                if (this.done.compareAndSet(false, true)) {
1235                    return this.future.cancel(false);
1236                }
1237                return false;
1238            }
1239    
1240            @Override
1241            public void aquireLocks() {
1242                if (this.locked.compareAndSet(false, true)) {
1243                    try {
1244                        globalQueueSemaphore.acquire();
1245                        store.acquireLocalAsyncLock();
1246                        message.incrementReferenceCount();
1247                    } catch (InterruptedException e) {
1248                        LOG.warn("Failed to aquire lock", e);
1249                    }
1250                }
1251    
1252            }
1253    
1254            @Override
1255            public void releaseLocks() {
1256                if (this.locked.compareAndSet(true, false)) {
1257                    store.releaseLocalAsyncLock();
1258                    globalQueueSemaphore.release();
1259                    message.decrementReferenceCount();
1260                }
1261            }
1262    
1263            @Override
1264            public void run() {
1265                this.store.doneTasks++;
1266                try {
1267                    if (this.done.compareAndSet(false, true)) {
1268                        this.store.addMessage(context, message);
1269                        removeQueueTask(this.store, this.message.getMessageId());
1270                        this.future.complete();
1271                    } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1272                        System.err.println(this.store.dest.getName() + " cancelled: "
1273                                + (this.store.canceledTasks / this.store.doneTasks) * 100);
1274                        this.store.canceledTasks = this.store.doneTasks = 0;
1275                    }
1276                } catch (Exception e) {
1277                    this.future.setException(e);
1278                }
1279            }
1280    
1281            protected Message getMessage() {
1282                return this.message;
1283            }
1284    
1285            private class InnerFutureTask extends FutureTask<Object> {
1286    
1287                public InnerFutureTask(Runnable runnable) {
1288                    super(runnable, null);
1289    
1290                }
1291    
1292                public void setException(final Exception e) {
1293                    super.setException(e);
1294                }
1295    
1296                public void complete() {
1297                    super.set(null);
1298                }
1299            }
1300        }
1301    
1302        class StoreTopicTask extends StoreQueueTask {
1303            private final int subscriptionCount;
1304            private final List<String> subscriptionKeys = new ArrayList<String>(1);
1305            private final KahaDBTopicMessageStore topicStore;
1306            public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1307                    int subscriptionCount) {
1308                super(store, context, message);
1309                this.topicStore = store;
1310                this.subscriptionCount = subscriptionCount;
1311    
1312            }
1313    
1314            @Override
1315            public void aquireLocks() {
1316                if (this.locked.compareAndSet(false, true)) {
1317                    try {
1318                        globalTopicSemaphore.acquire();
1319                        store.acquireLocalAsyncLock();
1320                        message.incrementReferenceCount();
1321                    } catch (InterruptedException e) {
1322                        LOG.warn("Failed to aquire lock", e);
1323                    }
1324                }
1325    
1326            }
1327    
1328            @Override
1329            public void releaseLocks() {
1330                if (this.locked.compareAndSet(true, false)) {
1331                    message.decrementReferenceCount();
1332                    store.releaseLocalAsyncLock();
1333                    globalTopicSemaphore.release();
1334                }
1335            }
1336    
1337            /**
1338             * add a key
1339             *
1340             * @param key
1341             * @return true if all acknowledgements received
1342             */
1343            public boolean addSubscriptionKey(String key) {
1344                synchronized (this.subscriptionKeys) {
1345                    this.subscriptionKeys.add(key);
1346                }
1347                return this.subscriptionKeys.size() >= this.subscriptionCount;
1348            }
1349    
1350            @Override
1351            public void run() {
1352                this.store.doneTasks++;
1353                try {
1354                    if (this.done.compareAndSet(false, true)) {
1355                        this.topicStore.addMessage(context, message);
1356                        // apply any acks we have
1357                        synchronized (this.subscriptionKeys) {
1358                            for (String key : this.subscriptionKeys) {
1359                                this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1360    
1361                            }
1362                        }
1363                        removeTopicTask(this.topicStore, this.message.getMessageId());
1364                        this.future.complete();
1365                    } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1366                        System.err.println(this.store.dest.getName() + " cancelled: "
1367                                + (this.store.canceledTasks / this.store.doneTasks) * 100);
1368                        this.store.canceledTasks = this.store.doneTasks = 0;
1369                    }
1370                } catch (Exception e) {
1371                    this.future.setException(e);
1372                }
1373            }
1374        }
1375    
1376        public class StoreTaskExecutor extends ThreadPoolExecutor {
1377    
1378            public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1379                super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1380            }
1381    
1382            @Override
1383            protected void afterExecute(Runnable runnable, Throwable throwable) {
1384                super.afterExecute(runnable, throwable);
1385    
1386                if (runnable instanceof StoreTask) {
1387                   ((StoreTask)runnable).releaseLocks();
1388                }
1389    
1390            }
1391        }
1392    }