001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.FutureTask;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.concurrent.atomic.AtomicReference;
044
045import org.apache.activemq.broker.ConnectionContext;
046import org.apache.activemq.broker.region.BaseDestination;
047import org.apache.activemq.broker.scheduler.JobSchedulerStore;
048import org.apache.activemq.command.ActiveMQDestination;
049import org.apache.activemq.command.ActiveMQQueue;
050import org.apache.activemq.command.ActiveMQTempQueue;
051import org.apache.activemq.command.ActiveMQTempTopic;
052import org.apache.activemq.command.ActiveMQTopic;
053import org.apache.activemq.command.Message;
054import org.apache.activemq.command.MessageAck;
055import org.apache.activemq.command.MessageId;
056import org.apache.activemq.command.ProducerId;
057import org.apache.activemq.command.SubscriptionInfo;
058import org.apache.activemq.command.TransactionId;
059import org.apache.activemq.openwire.OpenWireFormat;
060import org.apache.activemq.protobuf.Buffer;
061import org.apache.activemq.store.AbstractMessageStore;
062import org.apache.activemq.store.IndexListener;
063import org.apache.activemq.store.ListenableFuture;
064import org.apache.activemq.store.MessageRecoveryListener;
065import org.apache.activemq.store.MessageStore;
066import org.apache.activemq.store.MessageStoreStatistics;
067import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
068import org.apache.activemq.store.NoLocalSubscriptionAware;
069import org.apache.activemq.store.PersistenceAdapter;
070import org.apache.activemq.store.TopicMessageStore;
071import org.apache.activemq.store.TransactionIdTransformer;
072import org.apache.activemq.store.TransactionStore;
073import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
074import org.apache.activemq.store.kahadb.data.KahaDestination;
075import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
076import org.apache.activemq.store.kahadb.data.KahaLocation;
077import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
078import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
079import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
080import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
081import org.apache.activemq.store.kahadb.disk.journal.Location;
082import org.apache.activemq.store.kahadb.disk.page.Transaction;
083import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
084import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
085import org.apache.activemq.usage.MemoryUsage;
086import org.apache.activemq.usage.SystemUsage;
087import org.apache.activemq.util.IOExceptionSupport;
088import org.apache.activemq.util.ServiceStopper;
089import org.apache.activemq.util.ThreadPoolUtils;
090import org.apache.activemq.wireformat.WireFormat;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, NoLocalSubscriptionAware {
095    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
096    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
097
098    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
099    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
100            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
101    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
102    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
103            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
104
105    protected ExecutorService queueExecutor;
106    protected ExecutorService topicExecutor;
107    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
108    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
109    final WireFormat wireFormat = new OpenWireFormat();
110    private SystemUsage usageManager;
111    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
112    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
113    Semaphore globalQueueSemaphore;
114    Semaphore globalTopicSemaphore;
115    private boolean concurrentStoreAndDispatchQueues = true;
116    // when true, message order may be compromised when cache is exhausted if store is out
117    // or order w.r.t cache
118    private boolean concurrentStoreAndDispatchTopics = false;
119    private final boolean concurrentStoreAndDispatchTransactions = false;
120    private int maxAsyncJobs = MAX_ASYNC_JOBS;
121    private final KahaDBTransactionStore transactionStore;
122    private TransactionIdTransformer transactionIdTransformer;
123
124    public KahaDBStore() {
125        this.transactionStore = new KahaDBTransactionStore(this);
126        this.transactionIdTransformer = new TransactionIdTransformer() {
127            @Override
128            public TransactionId transform(TransactionId txid) {
129                return txid;
130            }
131        };
132    }
133
134    @Override
135    public String toString() {
136        return "KahaDB:[" + directory.getAbsolutePath() + "]";
137    }
138
139    @Override
140    public void setBrokerName(String brokerName) {
141    }
142
143    @Override
144    public void setUsageManager(SystemUsage usageManager) {
145        this.usageManager = usageManager;
146    }
147
148    public SystemUsage getUsageManager() {
149        return this.usageManager;
150    }
151
152    /**
153     * @return the concurrentStoreAndDispatch
154     */
155    public boolean isConcurrentStoreAndDispatchQueues() {
156        return this.concurrentStoreAndDispatchQueues;
157    }
158
159    /**
160     * @param concurrentStoreAndDispatch
161     *            the concurrentStoreAndDispatch to set
162     */
163    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
164        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
165    }
166
167    /**
168     * @return the concurrentStoreAndDispatch
169     */
170    public boolean isConcurrentStoreAndDispatchTopics() {
171        return this.concurrentStoreAndDispatchTopics;
172    }
173
174    /**
175     * @param concurrentStoreAndDispatch
176     *            the concurrentStoreAndDispatch to set
177     */
178    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
179        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
180    }
181
182    public boolean isConcurrentStoreAndDispatchTransactions() {
183        return this.concurrentStoreAndDispatchTransactions;
184    }
185
186    /**
187     * @return the maxAsyncJobs
188     */
189    public int getMaxAsyncJobs() {
190        return this.maxAsyncJobs;
191    }
192
193    /**
194     * @param maxAsyncJobs
195     *            the maxAsyncJobs to set
196     */
197    public void setMaxAsyncJobs(int maxAsyncJobs) {
198        this.maxAsyncJobs = maxAsyncJobs;
199    }
200
201
202    @Override
203    protected void configureMetadata() {
204        if (brokerService != null) {
205            metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
206            wireFormat.setVersion(metadata.openwireVersion);
207
208            if (LOG.isDebugEnabled()) {
209                LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
210            }
211
212        }
213    }
214
215    @Override
216    public void doStart() throws Exception {
217        //configure the metadata before start, right now
218        //this is just the open wire version
219        configureMetadata();
220
221        super.doStart();
222
223        if (brokerService != null) {
224            // In case the recovered store used a different OpenWire version log a warning
225            // to assist in determining why journal reads fail.
226            if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
227                LOG.warn("Existing Store uses a different OpenWire version[{}] " +
228                         "than the version configured[{}] reverting to the version " +
229                         "used by this store, some newer broker features may not work" +
230                         "as expected.",
231                         metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
232
233                // Update the broker service instance to the actual version in use.
234                wireFormat.setVersion(metadata.openwireVersion);
235                brokerService.setStoreOpenWireVersion(metadata.openwireVersion);
236            }
237        }
238
239        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
240        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
241        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
242        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
243        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
244            asyncQueueJobQueue, new ThreadFactory() {
245                @Override
246                public Thread newThread(Runnable runnable) {
247                    Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
248                    thread.setDaemon(true);
249                    return thread;
250                }
251            });
252        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
253            asyncTopicJobQueue, new ThreadFactory() {
254                @Override
255                public Thread newThread(Runnable runnable) {
256                    Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
257                    thread.setDaemon(true);
258                    return thread;
259                }
260            });
261    }
262
263    @Override
264    public void doStop(ServiceStopper stopper) throws Exception {
265        // drain down async jobs
266        LOG.info("Stopping async queue tasks");
267        if (this.globalQueueSemaphore != null) {
268            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
269        }
270        synchronized (this.asyncQueueMaps) {
271            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
272                synchronized (m) {
273                    for (StoreTask task : m.values()) {
274                        task.cancel();
275                    }
276                }
277            }
278            this.asyncQueueMaps.clear();
279        }
280        LOG.info("Stopping async topic tasks");
281        if (this.globalTopicSemaphore != null) {
282            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
283        }
284        synchronized (this.asyncTopicMaps) {
285            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
286                synchronized (m) {
287                    for (StoreTask task : m.values()) {
288                        task.cancel();
289                    }
290                }
291            }
292            this.asyncTopicMaps.clear();
293        }
294        if (this.globalQueueSemaphore != null) {
295            this.globalQueueSemaphore.drainPermits();
296        }
297        if (this.globalTopicSemaphore != null) {
298            this.globalTopicSemaphore.drainPermits();
299        }
300        if (this.queueExecutor != null) {
301            ThreadPoolUtils.shutdownNow(queueExecutor);
302            queueExecutor = null;
303        }
304        if (this.topicExecutor != null) {
305            ThreadPoolUtils.shutdownNow(topicExecutor);
306            topicExecutor = null;
307        }
308        LOG.info("Stopped KahaDB");
309        super.doStop(stopper);
310    }
311
312    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
313        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
314            @Override
315            public Location execute(Transaction tx) throws IOException {
316                StoredDestination sd = getStoredDestination(destination, tx);
317                Long sequence = sd.messageIdIndex.get(tx, key);
318                if (sequence == null) {
319                    return null;
320                }
321                return sd.orderIndex.get(tx, sequence).location;
322            }
323        });
324    }
325
326    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
327        StoreQueueTask task = null;
328        synchronized (store.asyncTaskMap) {
329            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
330        }
331        return task;
332    }
333
334    // with asyncTaskMap locked
335    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
336        store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
337        this.queueExecutor.execute(task);
338    }
339
340    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
341        StoreTopicTask task = null;
342        synchronized (store.asyncTaskMap) {
343            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
344        }
345        return task;
346    }
347
348    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
349        synchronized (store.asyncTaskMap) {
350            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
351        }
352        this.topicExecutor.execute(task);
353    }
354
355    @Override
356    public TransactionStore createTransactionStore() throws IOException {
357        return this.transactionStore;
358    }
359
360    public boolean getForceRecoverIndex() {
361        return this.forceRecoverIndex;
362    }
363
364    public void setForceRecoverIndex(boolean forceRecoverIndex) {
365        this.forceRecoverIndex = forceRecoverIndex;
366    }
367
368    public class KahaDBMessageStore extends AbstractMessageStore {
369        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
370        protected KahaDestination dest;
371        private final int maxAsyncJobs;
372        private final Semaphore localDestinationSemaphore;
373
374        double doneTasks, canceledTasks = 0;
375
376        public KahaDBMessageStore(ActiveMQDestination destination) {
377            super(destination);
378            this.dest = convert(destination);
379            this.maxAsyncJobs = getMaxAsyncJobs();
380            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
381        }
382
383        @Override
384        public ActiveMQDestination getDestination() {
385            return destination;
386        }
387
388        @Override
389        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
390                throws IOException {
391            if (isConcurrentStoreAndDispatchQueues()) {
392                message.beforeMarshall(wireFormat);
393                StoreQueueTask result = new StoreQueueTask(this, context, message);
394                ListenableFuture<Object> future = result.getFuture();
395                message.getMessageId().setFutureOrSequenceLong(future);
396                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
397                result.aquireLocks();
398                synchronized (asyncTaskMap) {
399                    addQueueTask(this, result);
400                    if (indexListener != null) {
401                        indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
402                    }
403                }
404                return future;
405            } else {
406                return super.asyncAddQueueMessage(context, message);
407            }
408        }
409
410        @Override
411        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
412            if (isConcurrentStoreAndDispatchQueues()) {
413                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
414                StoreQueueTask task = null;
415                synchronized (asyncTaskMap) {
416                    task = (StoreQueueTask) asyncTaskMap.get(key);
417                }
418                if (task != null) {
419                    if (ack.isInTransaction() || !task.cancel()) {
420                        try {
421                            task.future.get();
422                        } catch (InterruptedException e) {
423                            throw new InterruptedIOException(e.toString());
424                        } catch (Exception ignored) {
425                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
426                        }
427                        removeMessage(context, ack);
428                    } else {
429                        indexLock.writeLock().lock();
430                        try {
431                            metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId());
432                        } finally {
433                            indexLock.writeLock().unlock();
434                        }
435                        synchronized (asyncTaskMap) {
436                            asyncTaskMap.remove(key);
437                        }
438                    }
439                } else {
440                    removeMessage(context, ack);
441                }
442            } else {
443                removeMessage(context, ack);
444            }
445        }
446
447        @Override
448        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
449            final KahaAddMessageCommand command = new KahaAddMessageCommand();
450            command.setDestination(dest);
451            command.setMessageId(message.getMessageId().toProducerKey());
452            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
453            command.setPriority(message.getPriority());
454            command.setPrioritySupported(isPrioritizedMessages());
455            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
456            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
457            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
458                // sync add? (for async, future present from getFutureOrSequenceLong)
459                Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
460
461                @Override
462                public void sequenceAssignedWithIndexLocked(final long sequence) {
463                    message.getMessageId().setFutureOrSequenceLong(sequence);
464                    if (indexListener != null) {
465                        if (possibleFuture == null) {
466                            trackPendingAdd(dest, sequence);
467                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
468                                @Override
469                                public void run() {
470                                    trackPendingAddComplete(dest, sequence);
471                                }
472                            }));
473                        }
474                    }
475                }
476            }, null);
477        }
478
479        @Override
480        public void updateMessage(Message message) throws IOException {
481            if (LOG.isTraceEnabled()) {
482                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
483            }
484            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
485            KahaAddMessageCommand command = new KahaAddMessageCommand();
486            command.setDestination(dest);
487            command.setMessageId(message.getMessageId().toProducerKey());
488            command.setPriority(message.getPriority());
489            command.setPrioritySupported(prioritizedMessages);
490            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
491            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
492            updateMessageCommand.setMessage(command);
493            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
494        }
495
496        @Override
497        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
498            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
499            command.setDestination(dest);
500            command.setMessageId(ack.getLastMessageId().toProducerKey());
501            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
502
503            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
504            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
505            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
506        }
507
508        @Override
509        public void removeAllMessages(ConnectionContext context) throws IOException {
510            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
511            command.setDestination(dest);
512            store(command, true, null, null);
513        }
514
515        @Override
516        public Message getMessage(MessageId identity) throws IOException {
517            final String key = identity.toProducerKey();
518
519            // Hopefully one day the page file supports concurrent read
520            // operations... but for now we must
521            // externally synchronize...
522            Location location;
523            indexLock.writeLock().lock();
524            try {
525                location = findMessageLocation(key, dest);
526            } finally {
527                indexLock.writeLock().unlock();
528            }
529            if (location == null) {
530                return null;
531            }
532
533            return loadMessage(location);
534        }
535
536        @Override
537        public boolean isEmpty() throws IOException {
538            indexLock.writeLock().lock();
539            try {
540                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
541                    @Override
542                    public Boolean execute(Transaction tx) throws IOException {
543                        // Iterate through all index entries to get a count of
544                        // messages in the destination.
545                        StoredDestination sd = getStoredDestination(dest, tx);
546                        return sd.locationIndex.isEmpty(tx);
547                    }
548                });
549            } finally {
550                indexLock.writeLock().unlock();
551            }
552        }
553
554        @Override
555        public void recover(final MessageRecoveryListener listener) throws Exception {
556            // recovery may involve expiry which will modify
557            indexLock.writeLock().lock();
558            try {
559                pageFile.tx().execute(new Transaction.Closure<Exception>() {
560                    @Override
561                    public void execute(Transaction tx) throws Exception {
562                        StoredDestination sd = getStoredDestination(dest, tx);
563                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
564                        sd.orderIndex.resetCursorPosition();
565                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
566                                .hasNext(); ) {
567                            Entry<Long, MessageKeys> entry = iterator.next();
568                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
569                                continue;
570                            }
571                            Message msg = loadMessage(entry.getValue().location);
572                            listener.recoverMessage(msg);
573                        }
574                    }
575                });
576            } finally {
577                indexLock.writeLock().unlock();
578            }
579        }
580
581        @Override
582        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
583            indexLock.writeLock().lock();
584            try {
585                pageFile.tx().execute(new Transaction.Closure<Exception>() {
586                    @Override
587                    public void execute(Transaction tx) throws Exception {
588                        StoredDestination sd = getStoredDestination(dest, tx);
589                        Entry<Long, MessageKeys> entry = null;
590                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
591                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
592                            entry = iterator.next();
593                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
594                                continue;
595                            }
596                            Message msg = loadMessage(entry.getValue().location);
597                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
598                            listener.recoverMessage(msg);
599                            counter++;
600                            if (counter >= maxReturned || !listener.canRecoveryNextMessage()) {
601                                break;
602                            }
603                        }
604                        sd.orderIndex.stoppedIterating();
605                    }
606                });
607            } finally {
608                indexLock.writeLock().unlock();
609            }
610        }
611
612        protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
613            int counter = 0;
614            String id;
615            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
616                id = iterator.next();
617                iterator.remove();
618                Long sequence = sd.messageIdIndex.get(tx, id);
619                if (sequence != null) {
620                    if (sd.orderIndex.alreadyDispatched(sequence)) {
621                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
622                        counter++;
623                        if (counter >= maxReturned) {
624                            break;
625                        }
626                    } else {
627                        LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
628                    }
629                } else {
630                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
631                }
632            }
633            return counter;
634        }
635
636
637        @Override
638        public void resetBatching() {
639            if (pageFile.isLoaded()) {
640                indexLock.writeLock().lock();
641                try {
642                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
643                        @Override
644                        public void execute(Transaction tx) throws Exception {
645                            StoredDestination sd = getExistingStoredDestination(dest, tx);
646                            if (sd != null) {
647                                sd.orderIndex.resetCursorPosition();}
648                            }
649                        });
650                } catch (Exception e) {
651                    LOG.error("Failed to reset batching",e);
652                } finally {
653                    indexLock.writeLock().unlock();
654                }
655            }
656        }
657
658        @Override
659        public void setBatch(final MessageId identity) throws IOException {
660            indexLock.writeLock().lock();
661            try {
662                pageFile.tx().execute(new Transaction.Closure<IOException>() {
663                    @Override
664                    public void execute(Transaction tx) throws IOException {
665                        StoredDestination sd = getStoredDestination(dest, tx);
666                        Long location = (Long) identity.getFutureOrSequenceLong();
667                        Long pending = sd.orderIndex.minPendingAdd();
668                        if (pending != null) {
669                            location = Math.min(location, pending-1);
670                        }
671                        sd.orderIndex.setBatch(tx, location);
672                    }
673                });
674            } finally {
675                indexLock.writeLock().unlock();
676            }
677        }
678
679        @Override
680        public void setMemoryUsage(MemoryUsage memoryUsage) {
681        }
682        @Override
683        public void start() throws Exception {
684            super.start();
685        }
686        @Override
687        public void stop() throws Exception {
688            super.stop();
689        }
690
691        protected void lockAsyncJobQueue() {
692            try {
693                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
694                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
695                }
696            } catch (Exception e) {
697                LOG.error("Failed to lock async jobs for " + this.destination, e);
698            }
699        }
700
701        protected void unlockAsyncJobQueue() {
702            this.localDestinationSemaphore.release(this.maxAsyncJobs);
703        }
704
705        protected void acquireLocalAsyncLock() {
706            try {
707                this.localDestinationSemaphore.acquire();
708            } catch (InterruptedException e) {
709                LOG.error("Failed to aquire async lock for " + this.destination, e);
710            }
711        }
712
713        protected void releaseLocalAsyncLock() {
714            this.localDestinationSemaphore.release();
715        }
716
717        @Override
718        public String toString(){
719            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
720        }
721
722        @Override
723        protected void recoverMessageStoreStatistics() throws IOException {
724            try {
725                MessageStoreStatistics recoveredStatistics;
726                lockAsyncJobQueue();
727                indexLock.writeLock().lock();
728                try {
729                    recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
730                        @Override
731                        public MessageStoreStatistics execute(Transaction tx) throws IOException {
732                            MessageStoreStatistics statistics = new MessageStoreStatistics();
733
734                            // Iterate through all index entries to get the size of each message
735                            StoredDestination sd = getStoredDestination(dest, tx);
736                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
737                                int locationSize = iterator.next().getKey().getSize();
738                                statistics.getMessageCount().increment();
739                                statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
740                            }
741                           return statistics;
742                        }
743                    });
744                    getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
745                    getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
746                } finally {
747                    indexLock.writeLock().unlock();
748                }
749            } finally {
750                unlockAsyncJobQueue();
751            }
752        }
753    }
754
755    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
756        private final AtomicInteger subscriptionCount = new AtomicInteger();
757        protected final MessageStoreSubscriptionStatistics messageStoreSubStats =
758                new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics());
759
760        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
761            super(destination);
762            this.subscriptionCount.set(getAllSubscriptions().length);
763            if (isConcurrentStoreAndDispatchTopics()) {
764                asyncTopicMaps.add(asyncTaskMap);
765            }
766        }
767
768        @Override
769        protected void recoverMessageStoreStatistics() throws IOException {
770            super.recoverMessageStoreStatistics();
771            this.recoverMessageStoreSubMetrics();
772        }
773
774        @Override
775        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
776                throws IOException {
777            if (isConcurrentStoreAndDispatchTopics()) {
778                message.beforeMarshall(wireFormat);
779                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
780                result.aquireLocks();
781                addTopicTask(this, result);
782                return result.getFuture();
783            } else {
784                return super.asyncAddTopicMessage(context, message);
785            }
786        }
787
788        @Override
789        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
790                                MessageId messageId, MessageAck ack) throws IOException {
791            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
792            if (isConcurrentStoreAndDispatchTopics()) {
793                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
794                StoreTopicTask task = null;
795                synchronized (asyncTaskMap) {
796                    task = (StoreTopicTask) asyncTaskMap.get(key);
797                }
798                if (task != null) {
799                    if (task.addSubscriptionKey(subscriptionKey)) {
800                        removeTopicTask(this, messageId);
801                        if (task.cancel()) {
802                            synchronized (asyncTaskMap) {
803                                asyncTaskMap.remove(key);
804                            }
805                        }
806                    }
807                } else {
808                    doAcknowledge(context, subscriptionKey, messageId, ack);
809                }
810            } else {
811                doAcknowledge(context, subscriptionKey, messageId, ack);
812            }
813        }
814
815        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
816                throws IOException {
817            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
818            command.setDestination(dest);
819            command.setSubscriptionKey(subscriptionKey);
820            command.setMessageId(messageId.toProducerKey());
821            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
822            if (ack != null && ack.isUnmatchedAck()) {
823                command.setAck(UNMATCHED);
824            } else {
825                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
826                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
827            }
828            store(command, false, null, null);
829        }
830
831        @Override
832        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
833            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
834                    .getSubscriptionName());
835            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
836            command.setDestination(dest);
837            command.setSubscriptionKey(subscriptionKey.toString());
838            command.setRetroactive(retroactive);
839            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
840            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
841            store(command, isEnableJournalDiskSyncs() && true, null, null);
842            this.subscriptionCount.incrementAndGet();
843        }
844
845        @Override
846        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
847            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
848            command.setDestination(dest);
849            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
850            store(command, isEnableJournalDiskSyncs() && true, null, null);
851            this.subscriptionCount.decrementAndGet();
852        }
853
854        @Override
855        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
856
857            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
858            indexLock.writeLock().lock();
859            try {
860                pageFile.tx().execute(new Transaction.Closure<IOException>() {
861                    @Override
862                    public void execute(Transaction tx) throws IOException {
863                        StoredDestination sd = getStoredDestination(dest, tx);
864                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
865                                .hasNext();) {
866                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
867                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
868                                    .getValue().getSubscriptionInfo().newInput()));
869                            subscriptions.add(info);
870
871                        }
872                    }
873                });
874            } finally {
875                indexLock.writeLock().unlock();
876            }
877
878            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
879            subscriptions.toArray(rc);
880            return rc;
881        }
882
883        @Override
884        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
885            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
886            indexLock.writeLock().lock();
887            try {
888                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
889                    @Override
890                    public SubscriptionInfo execute(Transaction tx) throws IOException {
891                        StoredDestination sd = getStoredDestination(dest, tx);
892                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
893                        if (command == null) {
894                            return null;
895                        }
896                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
897                                .getSubscriptionInfo().newInput()));
898                    }
899                });
900            } finally {
901                indexLock.writeLock().unlock();
902            }
903        }
904
905        @Override
906        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
907            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
908
909            if (isEnableSubscriptionStatistics()) {
910                return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount();
911            } else {
912
913                indexLock.writeLock().lock();
914                try {
915                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
916                        @Override
917                        public Integer execute(Transaction tx) throws IOException {
918                            StoredDestination sd = getStoredDestination(dest, tx);
919                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
920                            if (cursorPos == null) {
921                                // The subscription might not exist.
922                                return 0;
923                            }
924
925                            return (int) getStoredMessageCount(tx, sd, subscriptionKey);
926                        }
927                    });
928                } finally {
929                    indexLock.writeLock().unlock();
930                }
931            }
932        }
933
934
935        @Override
936        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
937            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
938            if (isEnableSubscriptionStatistics()) {
939                return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize();
940            } else {
941                indexLock.writeLock().lock();
942                try {
943                    return pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
944                        @Override
945                        public Long execute(Transaction tx) throws IOException {
946                            StoredDestination sd = getStoredDestination(dest, tx);
947                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
948                            if (cursorPos == null) {
949                                // The subscription might not exist.
950                                return 0l;
951                            }
952
953                            return getStoredMessageSize(tx, sd, subscriptionKey);
954                        }
955                    });
956                } finally {
957                    indexLock.writeLock().unlock();
958                }
959            }
960        }
961
962        protected void recoverMessageStoreSubMetrics() throws IOException {
963            if (isEnableSubscriptionStatistics()) {
964                final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics();
965                indexLock.writeLock().lock();
966                try {
967                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
968                        @Override
969                        public void execute(Transaction tx) throws IOException {
970                            StoredDestination sd = getStoredDestination(dest, tx);
971
972                            List<String> subscriptionKeys = new ArrayList<>();
973                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions
974                                    .iterator(tx); iterator.hasNext();) {
975                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
976
977                                final String subscriptionKey = entry.getKey();
978                                final LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
979                                if (cursorPos != null) {
980                                    //add the subscriptions to a list for recovering pending sizes below
981                                    subscriptionKeys.add(subscriptionKey);
982                                    //recover just the count here as that is fast
983                                    statistics.getMessageCount(subscriptionKey)
984                                            .setCount(getStoredMessageCount(tx, sd, subscriptionKey));
985                                }
986                            }
987
988                            //Recover the message sizes for each subscription by iterating only 1 time over the order index
989                            //to speed up recovery
990                            final Map<String, AtomicLong> subPendingMessageSizes = getStoredMessageSize(tx, sd, subscriptionKeys);
991                            subPendingMessageSizes.forEach((k,v) -> {
992                                statistics.getMessageSize(k).addSize(v.get() > 0 ? v.get() : 0);
993                            });
994                        }
995                    });
996                } finally {
997                    indexLock.writeLock().unlock();
998                }
999            }
1000        }
1001
1002        @Override
1003        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
1004                throws Exception {
1005            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1006            @SuppressWarnings("unused")
1007            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
1008            indexLock.writeLock().lock();
1009            try {
1010                pageFile.tx().execute(new Transaction.Closure<Exception>() {
1011                    @Override
1012                    public void execute(Transaction tx) throws Exception {
1013                        StoredDestination sd = getStoredDestination(dest, tx);
1014                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
1015                        SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
1016                        //If we have ackPositions tracked then compare the first one as individual acknowledge mode
1017                        //may have bumped lastAck even though there are earlier messages to still consume
1018                        if (subAckPositions != null && !subAckPositions.isEmpty()
1019                                && subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) {
1020                            //we have messages to ack before lastAckedSequence
1021                            sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
1022                        } else {
1023                            subAckPositions = null;
1024                            sd.orderIndex.setBatch(tx, cursorPos);
1025                        }
1026                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
1027                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
1028                                .hasNext();) {
1029                            Entry<Long, MessageKeys> entry = iterator.next();
1030                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
1031                                continue;
1032                            }
1033                            //If subAckPositions is set then verify the sequence set contains the message still
1034                            //and if it doesn't skip it
1035                            if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
1036                                continue;
1037                            }
1038                            listener.recoverMessage(loadMessage(entry.getValue().location));
1039                        }
1040                        sd.orderIndex.resetCursorPosition();
1041                    }
1042                });
1043            } finally {
1044                indexLock.writeLock().unlock();
1045            }
1046        }
1047
1048        @Override
1049        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
1050                final MessageRecoveryListener listener) throws Exception {
1051            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1052            @SuppressWarnings("unused")
1053            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
1054            indexLock.writeLock().lock();
1055            try {
1056                pageFile.tx().execute(new Transaction.Closure<Exception>() {
1057                    @Override
1058                    public void execute(Transaction tx) throws Exception {
1059                        StoredDestination sd = getStoredDestination(dest, tx);
1060                        sd.orderIndex.resetCursorPosition();
1061                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
1062                        SequenceSet subAckPositions = null;
1063                        if (moc == null) {
1064                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
1065                            if (pos == null) {
1066                                // sub deleted
1067                                return;
1068                            }
1069                            subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
1070                            //If we have ackPositions tracked then compare the first one as individual acknowledge mode
1071                            //may have bumped lastAck even though there are earlier messages to still consume
1072                            if (subAckPositions != null && !subAckPositions.isEmpty()
1073                                    && subAckPositions.getHead().getFirst() < pos.lastAckedSequence) {
1074                                //we have messages to ack before lastAckedSequence
1075                                sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
1076                            } else {
1077                                subAckPositions = null;
1078                                sd.orderIndex.setBatch(tx, pos);
1079                            }
1080                            moc = sd.orderIndex.cursor;
1081                        } else {
1082                            sd.orderIndex.cursor.sync(moc);
1083                        }
1084
1085                        Entry<Long, MessageKeys> entry = null;
1086                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
1087                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
1088                                .hasNext();) {
1089                            entry = iterator.next();
1090                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
1091                                continue;
1092                            }
1093                            //If subAckPositions is set then verify the sequence set contains the message still
1094                            //and if it doesn't skip it
1095                            if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
1096                                continue;
1097                            }
1098                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
1099                                counter++;
1100                            }
1101                            if (counter >= maxReturned || listener.hasSpace() == false) {
1102                                break;
1103                            }
1104                        }
1105                        sd.orderIndex.stoppedIterating();
1106                        if (entry != null) {
1107                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
1108                            sd.subscriptionCursors.put(subscriptionKey, copy);
1109                        }
1110                    }
1111                });
1112            } finally {
1113                indexLock.writeLock().unlock();
1114            }
1115        }
1116
1117        @Override
1118        public void resetBatching(String clientId, String subscriptionName) {
1119            try {
1120                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1121                indexLock.writeLock().lock();
1122                try {
1123                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1124                        @Override
1125                        public void execute(Transaction tx) throws IOException {
1126                            StoredDestination sd = getStoredDestination(dest, tx);
1127                            sd.subscriptionCursors.remove(subscriptionKey);
1128                        }
1129                    });
1130                }finally {
1131                    indexLock.writeLock().unlock();
1132                }
1133            } catch (IOException e) {
1134                throw new RuntimeException(e);
1135            }
1136        }
1137
1138        @Override
1139        public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
1140            return messageStoreSubStats;
1141        }
1142    }
1143
1144    String subscriptionKey(String clientId, String subscriptionName) {
1145        return clientId + ":" + subscriptionName;
1146    }
1147
1148    @Override
1149    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
1150        String key = key(convert(destination));
1151        MessageStore store = storeCache.get(key(convert(destination)));
1152        if (store == null) {
1153            final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
1154            store = storeCache.putIfAbsent(key, queueStore);
1155            if (store == null) {
1156                store = queueStore;
1157            }
1158        }
1159
1160        return store;
1161    }
1162
1163    @Override
1164    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
1165        String key = key(convert(destination));
1166        MessageStore store = storeCache.get(key(convert(destination)));
1167        if (store == null) {
1168            final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
1169            store = storeCache.putIfAbsent(key, topicStore);
1170            if (store == null) {
1171                store = topicStore;
1172            }
1173        }
1174
1175        return (TopicMessageStore) store;
1176    }
1177
1178    /**
1179     * Cleanup method to remove any state associated with the given destination.
1180     * This method does not stop the message store (it might not be cached).
1181     *
1182     * @param destination
1183     *            Destination to forget
1184     */
1185    @Override
1186    public void removeQueueMessageStore(ActiveMQQueue destination) {
1187    }
1188
1189    /**
1190     * Cleanup method to remove any state associated with the given destination
1191     * This method does not stop the message store (it might not be cached).
1192     *
1193     * @param destination
1194     *            Destination to forget
1195     */
1196    @Override
1197    public void removeTopicMessageStore(ActiveMQTopic destination) {
1198    }
1199
1200    @Override
1201    public void deleteAllMessages() throws IOException {
1202        deleteAllMessages = true;
1203    }
1204
1205    @Override
1206    public Set<ActiveMQDestination> getDestinations() {
1207        try {
1208            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1209            indexLock.writeLock().lock();
1210            try {
1211                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1212                    @Override
1213                    public void execute(Transaction tx) throws IOException {
1214                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1215                                .hasNext();) {
1216                            Entry<String, StoredDestination> entry = iterator.next();
1217                            //Removing isEmpty topic check - see AMQ-5875
1218                            rc.add(convert(entry.getKey()));
1219                        }
1220                    }
1221                });
1222            }finally {
1223                indexLock.writeLock().unlock();
1224            }
1225            return rc;
1226        } catch (IOException e) {
1227            throw new RuntimeException(e);
1228        }
1229    }
1230
1231    @Override
1232    public long getLastMessageBrokerSequenceId() throws IOException {
1233        return 0;
1234    }
1235
1236    @Override
1237    public long getLastProducerSequenceId(ProducerId id) {
1238        indexLock.writeLock().lock();
1239        try {
1240            return metadata.producerSequenceIdTracker.getLastSeqId(id);
1241        } finally {
1242            indexLock.writeLock().unlock();
1243        }
1244    }
1245
1246    @Override
1247    public long size() {
1248        try {
1249            return journalSize.get() + getPageFile().getDiskSize();
1250        } catch (IOException e) {
1251            throw new RuntimeException(e);
1252        }
1253    }
1254
1255    @Override
1256    public void beginTransaction(ConnectionContext context) throws IOException {
1257        throw new IOException("Not yet implemented.");
1258    }
1259    @Override
1260    public void commitTransaction(ConnectionContext context) throws IOException {
1261        throw new IOException("Not yet implemented.");
1262    }
1263    @Override
1264    public void rollbackTransaction(ConnectionContext context) throws IOException {
1265        throw new IOException("Not yet implemented.");
1266    }
1267
1268    @Override
1269    public void checkpoint(boolean sync) throws IOException {
1270        super.checkpointCleanup(sync);
1271    }
1272
1273    // /////////////////////////////////////////////////////////////////
1274    // Internal helper methods.
1275    // /////////////////////////////////////////////////////////////////
1276
1277    /**
1278     * @param location
1279     * @return
1280     * @throws IOException
1281     */
1282    Message loadMessage(Location location) throws IOException {
1283        try {
1284            JournalCommand<?> command = load(location);
1285            KahaAddMessageCommand addMessage = null;
1286            switch (command.type()) {
1287                case KAHA_UPDATE_MESSAGE_COMMAND:
1288                    addMessage = ((KahaUpdateMessageCommand) command).getMessage();
1289                    break;
1290                case KAHA_ADD_MESSAGE_COMMAND:
1291                    addMessage = (KahaAddMessageCommand) command;
1292                    break;
1293                default:
1294                    throw new IOException("Could not load journal record, unexpected command type: " + command.type() + " at location: " + location);
1295            }
1296            if (!addMessage.hasMessage()) {
1297                throw new IOException("Could not load journal record, null message content at location: " + location);
1298            }
1299            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1300            return msg;
1301        } catch (Throwable t) {
1302            IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t);
1303            LOG.error("Failed to load message at: {}", location , ioe);
1304            brokerService.handleIOException(ioe);
1305            throw ioe;
1306        }
1307    }
1308
1309    // /////////////////////////////////////////////////////////////////
1310    // Internal conversion methods.
1311    // /////////////////////////////////////////////////////////////////
1312
1313    KahaLocation convert(Location location) {
1314        KahaLocation rc = new KahaLocation();
1315        rc.setLogId(location.getDataFileId());
1316        rc.setOffset(location.getOffset());
1317        return rc;
1318    }
1319
1320    KahaDestination convert(ActiveMQDestination dest) {
1321        KahaDestination rc = new KahaDestination();
1322        rc.setName(dest.getPhysicalName());
1323        switch (dest.getDestinationType()) {
1324        case ActiveMQDestination.QUEUE_TYPE:
1325            rc.setType(DestinationType.QUEUE);
1326            return rc;
1327        case ActiveMQDestination.TOPIC_TYPE:
1328            rc.setType(DestinationType.TOPIC);
1329            return rc;
1330        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1331            rc.setType(DestinationType.TEMP_QUEUE);
1332            return rc;
1333        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1334            rc.setType(DestinationType.TEMP_TOPIC);
1335            return rc;
1336        default:
1337            return null;
1338        }
1339    }
1340
1341    ActiveMQDestination convert(String dest) {
1342        int p = dest.indexOf(":");
1343        if (p < 0) {
1344            throw new IllegalArgumentException("Not in the valid destination format");
1345        }
1346        int type = Integer.parseInt(dest.substring(0, p));
1347        String name = dest.substring(p + 1);
1348        return convert(type, name);
1349    }
1350
1351    private ActiveMQDestination convert(KahaDestination commandDestination) {
1352        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1353    }
1354
1355    private ActiveMQDestination convert(int type, String name) {
1356        switch (KahaDestination.DestinationType.valueOf(type)) {
1357        case QUEUE:
1358            return new ActiveMQQueue(name);
1359        case TOPIC:
1360            return new ActiveMQTopic(name);
1361        case TEMP_QUEUE:
1362            return new ActiveMQTempQueue(name);
1363        case TEMP_TOPIC:
1364            return new ActiveMQTempTopic(name);
1365        default:
1366            throw new IllegalArgumentException("Not in the valid destination format");
1367        }
1368    }
1369
1370    public TransactionIdTransformer getTransactionIdTransformer() {
1371        return transactionIdTransformer;
1372    }
1373
1374    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1375        this.transactionIdTransformer = transactionIdTransformer;
1376    }
1377
1378    static class AsyncJobKey {
1379        MessageId id;
1380        ActiveMQDestination destination;
1381
1382        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1383            this.id = id;
1384            this.destination = destination;
1385        }
1386
1387        @Override
1388        public boolean equals(Object obj) {
1389            if (obj == this) {
1390                return true;
1391            }
1392            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1393                    && destination.equals(((AsyncJobKey) obj).destination);
1394        }
1395
1396        @Override
1397        public int hashCode() {
1398            return id.hashCode() + destination.hashCode();
1399        }
1400
1401        @Override
1402        public String toString() {
1403            return destination.getPhysicalName() + "-" + id;
1404        }
1405    }
1406
1407    public interface StoreTask {
1408        public boolean cancel();
1409
1410        public void aquireLocks();
1411
1412        public void releaseLocks();
1413    }
1414
1415    class StoreQueueTask implements Runnable, StoreTask {
1416        protected final Message message;
1417        protected final ConnectionContext context;
1418        protected final KahaDBMessageStore store;
1419        protected final InnerFutureTask future;
1420        protected final AtomicBoolean done = new AtomicBoolean();
1421        protected final AtomicBoolean locked = new AtomicBoolean();
1422
1423        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1424            this.store = store;
1425            this.context = context;
1426            this.message = message;
1427            this.future = new InnerFutureTask(this);
1428        }
1429
1430        public ListenableFuture<Object> getFuture() {
1431            return this.future;
1432        }
1433
1434        @Override
1435        public boolean cancel() {
1436            if (this.done.compareAndSet(false, true)) {
1437                return this.future.cancel(false);
1438            }
1439            return false;
1440        }
1441
1442        @Override
1443        public void aquireLocks() {
1444            if (this.locked.compareAndSet(false, true)) {
1445                try {
1446                    globalQueueSemaphore.acquire();
1447                    store.acquireLocalAsyncLock();
1448                    message.incrementReferenceCount();
1449                } catch (InterruptedException e) {
1450                    LOG.warn("Failed to aquire lock", e);
1451                }
1452            }
1453
1454        }
1455
1456        @Override
1457        public void releaseLocks() {
1458            if (this.locked.compareAndSet(true, false)) {
1459                store.releaseLocalAsyncLock();
1460                globalQueueSemaphore.release();
1461                message.decrementReferenceCount();
1462            }
1463        }
1464
1465        @Override
1466        public void run() {
1467            this.store.doneTasks++;
1468            try {
1469                if (this.done.compareAndSet(false, true)) {
1470                    this.store.addMessage(context, message);
1471                    removeQueueTask(this.store, this.message.getMessageId());
1472                    this.future.complete();
1473                } else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) {
1474                    System.err.println(this.store.dest.getName() + " cancelled: "
1475                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1476                    this.store.canceledTasks = this.store.doneTasks = 0;
1477                }
1478            } catch (Throwable t) {
1479                this.future.setException(t);
1480                removeQueueTask(this.store, this.message.getMessageId());
1481            }
1482        }
1483
1484        protected Message getMessage() {
1485            return this.message;
1486        }
1487
1488        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
1489
1490            private final AtomicReference<Runnable> listenerRef = new AtomicReference<>();
1491
1492            public InnerFutureTask(Runnable runnable) {
1493                super(runnable, null);
1494            }
1495
1496            @Override
1497            public void setException(final Throwable e) {
1498                super.setException(e);
1499            }
1500
1501            public void complete() {
1502                super.set(null);
1503            }
1504
1505            @Override
1506            public void done() {
1507                fireListener();
1508            }
1509
1510            @Override
1511            public void addListener(Runnable listener) {
1512                this.listenerRef.set(listener);
1513                if (isDone()) {
1514                    fireListener();
1515                }
1516            }
1517
1518            private void fireListener() {
1519                Runnable listener = listenerRef.getAndSet(null);
1520                if (listener != null) {
1521                    try {
1522                        listener.run();
1523                    } catch (Exception ignored) {
1524                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
1525                    }
1526                }
1527            }
1528        }
1529    }
1530
1531    class StoreTopicTask extends StoreQueueTask {
1532        private final int subscriptionCount;
1533        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1534        private final KahaDBTopicMessageStore topicStore;
1535        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1536                int subscriptionCount) {
1537            super(store, context, message);
1538            this.topicStore = store;
1539            this.subscriptionCount = subscriptionCount;
1540
1541        }
1542
1543        @Override
1544        public void aquireLocks() {
1545            if (this.locked.compareAndSet(false, true)) {
1546                try {
1547                    globalTopicSemaphore.acquire();
1548                    store.acquireLocalAsyncLock();
1549                    message.incrementReferenceCount();
1550                } catch (InterruptedException e) {
1551                    LOG.warn("Failed to aquire lock", e);
1552                }
1553            }
1554        }
1555
1556        @Override
1557        public void releaseLocks() {
1558            if (this.locked.compareAndSet(true, false)) {
1559                message.decrementReferenceCount();
1560                store.releaseLocalAsyncLock();
1561                globalTopicSemaphore.release();
1562            }
1563        }
1564
1565        /**
1566         * add a key
1567         *
1568         * @param key
1569         * @return true if all acknowledgements received
1570         */
1571        public boolean addSubscriptionKey(String key) {
1572            synchronized (this.subscriptionKeys) {
1573                this.subscriptionKeys.add(key);
1574            }
1575            return this.subscriptionKeys.size() >= this.subscriptionCount;
1576        }
1577
1578        @Override
1579        public void run() {
1580            this.store.doneTasks++;
1581            try {
1582                if (this.done.compareAndSet(false, true)) {
1583                    this.topicStore.addMessage(context, message);
1584                    // apply any acks we have
1585                    synchronized (this.subscriptionKeys) {
1586                        for (String key : this.subscriptionKeys) {
1587                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1588
1589                        }
1590                    }
1591                    removeTopicTask(this.topicStore, this.message.getMessageId());
1592                    this.future.complete();
1593                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1594                    System.err.println(this.store.dest.getName() + " cancelled: "
1595                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1596                    this.store.canceledTasks = this.store.doneTasks = 0;
1597                }
1598            } catch (Throwable t) {
1599                this.future.setException(t);
1600                removeTopicTask(this.topicStore, this.message.getMessageId());
1601            }
1602        }
1603    }
1604
1605    public class StoreTaskExecutor extends ThreadPoolExecutor {
1606
1607        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1608            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1609        }
1610
1611        @Override
1612        protected void afterExecute(Runnable runnable, Throwable throwable) {
1613            super.afterExecute(runnable, throwable);
1614
1615            if (runnable instanceof StoreTask) {
1616               ((StoreTask)runnable).releaseLocks();
1617            }
1618        }
1619    }
1620
1621    @Override
1622    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
1623        return new JobSchedulerStoreImpl();
1624    }
1625
1626    /* (non-Javadoc)
1627     * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
1628     */
1629    @Override
1630    public boolean isPersistNoLocal() {
1631        // Prior to v11 the broker did not store the noLocal value for durable subs.
1632        return brokerService.getStoreOpenWireVersion() >= 11;
1633    }
1634}