001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.EOFException;
026import java.io.File;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.InterruptedIOException;
030import java.io.ObjectInputStream;
031import java.io.ObjectOutputStream;
032import java.io.OutputStream;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Date;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.LinkedHashMap;
042import java.util.LinkedHashSet;
043import java.util.LinkedList;
044import java.util.List;
045import java.util.Map;
046import java.util.Map.Entry;
047import java.util.Set;
048import java.util.SortedSet;
049import java.util.TreeSet;
050import java.util.concurrent.ConcurrentHashMap;
051import java.util.concurrent.ConcurrentMap;
052import java.util.concurrent.Executors;
053import java.util.concurrent.ScheduledExecutorService;
054import java.util.concurrent.ThreadFactory;
055import java.util.concurrent.TimeUnit;
056import java.util.concurrent.atomic.AtomicBoolean;
057import java.util.concurrent.atomic.AtomicLong;
058import java.util.concurrent.atomic.AtomicReference;
059import java.util.concurrent.locks.ReentrantReadWriteLock;
060
061import org.apache.activemq.ActiveMQMessageAuditNoSync;
062import org.apache.activemq.broker.BrokerService;
063import org.apache.activemq.broker.BrokerServiceAware;
064import org.apache.activemq.broker.region.Destination;
065import org.apache.activemq.broker.region.Queue;
066import org.apache.activemq.broker.region.Topic;
067import org.apache.activemq.command.MessageAck;
068import org.apache.activemq.command.TransactionId;
069import org.apache.activemq.openwire.OpenWireFormat;
070import org.apache.activemq.protobuf.Buffer;
071import org.apache.activemq.store.MessageStore;
072import org.apache.activemq.store.MessageStoreStatistics;
073import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
074import org.apache.activemq.store.PersistenceAdapterStatistics;
075import org.apache.activemq.store.TopicMessageStore;
076import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
077import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
078import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
079import org.apache.activemq.store.kahadb.data.KahaDestination;
080import org.apache.activemq.store.kahadb.data.KahaEntryType;
081import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
082import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
083import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
084import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
085import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
086import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
087import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
088import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
089import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
090import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
091import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
092import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
093import org.apache.activemq.store.kahadb.disk.index.ListIndex;
094import org.apache.activemq.store.kahadb.disk.journal.DataFile;
095import org.apache.activemq.store.kahadb.disk.journal.Journal;
096import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
097import org.apache.activemq.store.kahadb.disk.journal.Location;
098import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
099import org.apache.activemq.store.kahadb.disk.page.Page;
100import org.apache.activemq.store.kahadb.disk.page.PageFile;
101import org.apache.activemq.store.kahadb.disk.page.Transaction;
102import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
103import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
104import org.apache.activemq.store.kahadb.disk.util.Marshaller;
105import org.apache.activemq.store.kahadb.disk.util.Sequence;
106import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
107import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
108import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
109import org.apache.activemq.util.ByteSequence;
110import org.apache.activemq.util.DataByteArrayInputStream;
111import org.apache.activemq.util.DataByteArrayOutputStream;
112import org.apache.activemq.util.IOExceptionSupport;
113import org.apache.activemq.util.IOHelper;
114import org.apache.activemq.util.ServiceStopper;
115import org.apache.activemq.util.ServiceSupport;
116import org.apache.activemq.util.ThreadPoolUtils;
117import org.slf4j.Logger;
118import org.slf4j.LoggerFactory;
119import org.slf4j.MDC;
120
121public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
122
123    protected BrokerService brokerService;
124
125    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
126    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
127    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
128    protected static final Buffer UNMATCHED;
129    static {
130        UNMATCHED = new Buffer(new byte[]{});
131    }
132    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
133
134    static final int CLOSED_STATE = 1;
135    static final int OPEN_STATE = 2;
136    static final long NOT_ACKED = -1;
137
138    static final int VERSION = 6;
139
140    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
141
142    protected class Metadata {
143        protected Page<Metadata> page;
144        protected int state;
145        protected BTreeIndex<String, StoredDestination> destinations;
146        protected Location lastUpdate;
147        protected Location firstInProgressTransactionLocation;
148        protected Location producerSequenceIdTrackerLocation = null;
149        protected Location ackMessageFileMapLocation = null;
150        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
151        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<>();
152        protected transient AtomicBoolean ackMessageFileMapDirtyFlag = new AtomicBoolean(false);
153        protected int version = VERSION;
154        protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
155
156        public void read(DataInput is) throws IOException {
157            state = is.readInt();
158            destinations = new BTreeIndex<>(pageFile, is.readLong());
159            if (is.readBoolean()) {
160                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
161            } else {
162                lastUpdate = null;
163            }
164            if (is.readBoolean()) {
165                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
166            } else {
167                firstInProgressTransactionLocation = null;
168            }
169            try {
170                if (is.readBoolean()) {
171                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
172                } else {
173                    producerSequenceIdTrackerLocation = null;
174                }
175            } catch (EOFException expectedOnUpgrade) {
176            }
177            try {
178                version = is.readInt();
179            } catch (EOFException expectedOnUpgrade) {
180                version = 1;
181            }
182            if (version >= 5 && is.readBoolean()) {
183                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
184            } else {
185                ackMessageFileMapLocation = null;
186            }
187            try {
188                openwireVersion = is.readInt();
189            } catch (EOFException expectedOnUpgrade) {
190                openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
191            }
192            LOG.info("KahaDB is version " + version);
193        }
194
195        public void write(DataOutput os) throws IOException {
196            os.writeInt(state);
197            os.writeLong(destinations.getPageId());
198
199            if (lastUpdate != null) {
200                os.writeBoolean(true);
201                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
202            } else {
203                os.writeBoolean(false);
204            }
205
206            if (firstInProgressTransactionLocation != null) {
207                os.writeBoolean(true);
208                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
209            } else {
210                os.writeBoolean(false);
211            }
212
213            if (producerSequenceIdTrackerLocation != null) {
214                os.writeBoolean(true);
215                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
216            } else {
217                os.writeBoolean(false);
218            }
219            os.writeInt(VERSION);
220            if (ackMessageFileMapLocation != null) {
221                os.writeBoolean(true);
222                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
223            } else {
224                os.writeBoolean(false);
225            }
226            os.writeInt(this.openwireVersion);
227        }
228    }
229
230    class MetadataMarshaller extends VariableMarshaller<Metadata> {
231        @Override
232        public Metadata readPayload(DataInput dataIn) throws IOException {
233            Metadata rc = createMetadata();
234            rc.read(dataIn);
235            return rc;
236        }
237
238        @Override
239        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
240            object.write(dataOut);
241        }
242    }
243
244    public enum PurgeRecoveredXATransactionStrategy {
245        NEVER,
246        COMMIT,
247        ROLLBACK;
248    }
249
250    protected PageFile pageFile;
251    protected Journal journal;
252    protected Metadata metadata = new Metadata();
253    protected final PersistenceAdapterStatistics persistenceAdapterStatistics = new PersistenceAdapterStatistics();
254
255    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
256
257    protected boolean failIfDatabaseIsLocked;
258
259    protected boolean deleteAllMessages;
260    protected File directory = DEFAULT_DIRECTORY;
261    protected File indexDirectory = null;
262    protected ScheduledExecutorService scheduler;
263    private final Object schedulerLock = new Object();
264
265    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
266    protected boolean archiveDataLogs;
267    protected File directoryArchive;
268    protected AtomicLong journalSize = new AtomicLong(0);
269    long journalDiskSyncInterval = 1000;
270    long checkpointInterval = 5*1000;
271    long cleanupInterval = 30*1000;
272    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
273    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
274    boolean enableIndexWriteAsync = false;
275    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
276    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
277    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
278
279    protected AtomicBoolean opened = new AtomicBoolean();
280    private boolean ignoreMissingJournalfiles = false;
281    private int indexCacheSize = 10000;
282    private boolean checkForCorruptJournalFiles = false;
283    protected PurgeRecoveredXATransactionStrategy purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.NEVER;
284    private boolean checksumJournalFiles = true;
285    protected boolean forceRecoverIndex = false;
286    private boolean archiveCorruptedIndex = false;
287    private boolean useIndexLFRUEviction = false;
288    private float indexLFUEvictionFactor = 0.2f;
289    private boolean enableIndexDiskSyncs = true;
290    private boolean enableIndexRecoveryFile = true;
291    private boolean enableIndexPageCaching = true;
292    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
293
294    private boolean enableAckCompaction = true;
295    private int compactAcksAfterNoGC = 10;
296    private boolean compactAcksIgnoresStoreGrowth = false;
297    private int checkPointCyclesWithNoGC;
298    private int journalLogOnLastCompactionCheck;
299    private boolean enableSubscriptionStatistics = false;
300
301    //only set when using JournalDiskSyncStrategy.PERIODIC
302    protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>();
303
304    @Override
305    public void doStart() throws Exception {
306        load();
307    }
308
309    @Override
310    public void doStop(ServiceStopper stopper) throws Exception {
311        unload();
312    }
313
314    public void allowIOResumption() {
315        if (pageFile != null) {
316            pageFile.allowIOResumption();
317        }
318        if (journal != null) {
319            journal.allowIOResumption();
320        }
321    }
322
323    private void loadPageFile() throws IOException {
324        this.indexLock.writeLock().lock();
325        try {
326            final PageFile pageFile = getPageFile();
327            pageFile.load();
328            pageFile.tx().execute(new Transaction.Closure<IOException>() {
329                @Override
330                public void execute(Transaction tx) throws IOException {
331                    if (pageFile.getPageCount() == 0) {
332                        // First time this is created.. Initialize the metadata
333                        Page<Metadata> page = tx.allocate();
334                        assert page.getPageId() == 0;
335                        page.set(metadata);
336                        metadata.page = page;
337                        metadata.state = CLOSED_STATE;
338                        metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId());
339
340                        tx.store(metadata.page, metadataMarshaller, true);
341                    } else {
342                        Page<Metadata> page = tx.load(0, metadataMarshaller);
343                        metadata = page.get();
344                        metadata.page = page;
345                    }
346                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
347                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
348                    metadata.destinations.load(tx);
349                }
350            });
351            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
352            // Perhaps we should just keep an index of file
353            storedDestinations.clear();
354            pageFile.tx().execute(new Transaction.Closure<IOException>() {
355                @Override
356                public void execute(Transaction tx) throws IOException {
357                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
358                        Entry<String, StoredDestination> entry = iterator.next();
359                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
360                        storedDestinations.put(entry.getKey(), sd);
361
362                        if (checkForCorruptJournalFiles) {
363                            // sanity check the index also
364                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
365                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
366                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
367                                }
368                            }
369                        }
370                    }
371                }
372            });
373            pageFile.flush();
374        } finally {
375            this.indexLock.writeLock().unlock();
376        }
377    }
378
379    private void startCheckpoint() {
380        if (checkpointInterval == 0 && cleanupInterval == 0) {
381            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
382            return;
383        }
384        synchronized (schedulerLock) {
385            if (scheduler == null || scheduler.isShutdown()) {
386                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
387
388                    @Override
389                    public Thread newThread(Runnable r) {
390                        Thread schedulerThread = new Thread(r);
391
392                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
393                        schedulerThread.setDaemon(true);
394
395                        return schedulerThread;
396                    }
397                });
398
399                // Short intervals for check-point and cleanups
400                long delay;
401                if (journal.isJournalDiskSyncPeriodic()) {
402                    delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
403                } else {
404                    delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
405                }
406
407                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
408            }
409        }
410    }
411
412    private final class CheckpointRunner implements Runnable {
413
414        private long lastCheckpoint = System.currentTimeMillis();
415        private long lastCleanup = System.currentTimeMillis();
416        private long lastSync = System.currentTimeMillis();
417        private Location lastAsyncUpdate = null;
418
419        @Override
420        public void run() {
421            try {
422                // Decide on cleanup vs full checkpoint here.
423                if (opened.get()) {
424                    long now = System.currentTimeMillis();
425                    if (journal.isJournalDiskSyncPeriodic() &&
426                            journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
427                        Location currentUpdate = lastAsyncJournalUpdate.get();
428                        if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) {
429                            lastAsyncUpdate = currentUpdate;
430                            if (LOG.isTraceEnabled()) {
431                                LOG.trace("Writing trace command to trigger journal sync");
432                            }
433                            store(new KahaTraceCommand(), true, null, null);
434                        }
435                        lastSync = now;
436                    }
437                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
438                        checkpointCleanup(true);
439                        lastCleanup = now;
440                        lastCheckpoint = now;
441                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
442                        checkpointCleanup(false);
443                        lastCheckpoint = now;
444                    }
445                }
446            } catch (IOException ioe) {
447                LOG.error("Checkpoint failed", ioe);
448                brokerService.handleIOException(ioe);
449            } catch (Throwable e) {
450                LOG.error("Checkpoint failed", e);
451                brokerService.handleIOException(IOExceptionSupport.create(e));
452            }
453        }
454    }
455
456    public void open() throws IOException {
457        if( opened.compareAndSet(false, true) ) {
458            getJournal().start();
459            try {
460                loadPageFile();
461            } catch (Throwable t) {
462                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
463                if (LOG.isDebugEnabled()) {
464                    LOG.debug("Index load failure", t);
465                }
466                // try to recover index
467                try {
468                    pageFile.unload();
469                } catch (Exception ignore) {}
470                if (archiveCorruptedIndex) {
471                    pageFile.archive();
472                } else {
473                    pageFile.delete();
474                }
475                metadata = createMetadata();
476                //The metadata was recreated after a detect corruption so we need to
477                //reconfigure anything that was configured on the old metadata on startup
478                configureMetadata();
479                pageFile = null;
480                loadPageFile();
481            }
482            recover();
483            startCheckpoint();
484        }
485    }
486
487    public void load() throws IOException {
488        this.indexLock.writeLock().lock();
489        try {
490            IOHelper.mkdirs(directory);
491            if (deleteAllMessages) {
492                getJournal().setCheckForCorruptionOnStartup(false);
493                getJournal().start();
494                getJournal().delete();
495                getJournal().close();
496                journal = null;
497                getPageFile().delete();
498                LOG.info("Persistence store purged.");
499                deleteAllMessages = false;
500            }
501
502            open();
503            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
504        } finally {
505            this.indexLock.writeLock().unlock();
506        }
507    }
508
509    public void close() throws IOException, InterruptedException {
510        if (opened.compareAndSet(true, false)) {
511            checkpointLock.writeLock().lock();
512            try {
513                if (metadata.page != null) {
514                    checkpointUpdate(true);
515                }
516                pageFile.unload();
517                metadata = createMetadata();
518            } finally {
519                checkpointLock.writeLock().unlock();
520            }
521            journal.close();
522            synchronized(schedulerLock) {
523                if (scheduler != null) {
524                    ThreadPoolUtils.shutdownGraceful(scheduler, -1);
525                    scheduler = null;
526                }
527            }
528            // clear the cache and journalSize on shutdown of the store
529            storeCache.clear();
530            journalSize.set(0);
531        }
532    }
533
534    public void unload() throws IOException, InterruptedException {
535        this.indexLock.writeLock().lock();
536        try {
537            if( pageFile != null && pageFile.isLoaded() ) {
538                metadata.state = CLOSED_STATE;
539                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
540
541                if (metadata.page != null) {
542                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
543                        @Override
544                        public void execute(Transaction tx) throws IOException {
545                            tx.store(metadata.page, metadataMarshaller, true);
546                        }
547                    });
548                }
549            }
550        } finally {
551            this.indexLock.writeLock().unlock();
552        }
553        close();
554    }
555
556    // public for testing
557    @SuppressWarnings("rawtypes")
558    public Location[] getInProgressTxLocationRange() {
559        Location[] range = new Location[]{null, null};
560        synchronized (inflightTransactions) {
561            if (!inflightTransactions.isEmpty()) {
562                for (List<Operation> ops : inflightTransactions.values()) {
563                    if (!ops.isEmpty()) {
564                        trackMaxAndMin(range, ops);
565                    }
566                }
567            }
568            if (!preparedTransactions.isEmpty()) {
569                for (List<Operation> ops : preparedTransactions.values()) {
570                    if (!ops.isEmpty()) {
571                        trackMaxAndMin(range, ops);
572                    }
573                }
574            }
575        }
576        return range;
577    }
578
579    @SuppressWarnings("rawtypes")
580    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
581        Location t = ops.get(0).getLocation();
582        if (range[0] == null || t.compareTo(range[0]) <= 0) {
583            range[0] = t;
584        }
585        t = ops.get(ops.size() -1).getLocation();
586        if (range[1] == null || t.compareTo(range[1]) >= 0) {
587            range[1] = t;
588        }
589    }
590
591    class TranInfo {
592        TransactionId id;
593        Location location;
594
595        class opCount {
596            int add;
597            int remove;
598        }
599        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<>();
600
601        @SuppressWarnings("rawtypes")
602        public void track(Operation operation) {
603            if (location == null ) {
604                location = operation.getLocation();
605            }
606            KahaDestination destination;
607            boolean isAdd = false;
608            if (operation instanceof AddOperation) {
609                AddOperation add = (AddOperation) operation;
610                destination = add.getCommand().getDestination();
611                isAdd = true;
612            } else {
613                RemoveOperation removeOpperation = (RemoveOperation) operation;
614                destination = removeOpperation.getCommand().getDestination();
615            }
616            opCount opCount = destinationOpCount.get(destination);
617            if (opCount == null) {
618                opCount = new opCount();
619                destinationOpCount.put(destination, opCount);
620            }
621            if (isAdd) {
622                opCount.add++;
623            } else {
624                opCount.remove++;
625            }
626        }
627
628        @Override
629        public String toString() {
630           StringBuffer buffer = new StringBuffer();
631           buffer.append(location).append(";").append(id).append(";\n");
632           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
633               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
634           }
635           return buffer.toString();
636        }
637    }
638
639    @SuppressWarnings("rawtypes")
640    public String getTransactions() {
641
642        ArrayList<TranInfo> infos = new ArrayList<>();
643        synchronized (inflightTransactions) {
644            if (!inflightTransactions.isEmpty()) {
645                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
646                    TranInfo info = new TranInfo();
647                    info.id = entry.getKey();
648                    for (Operation operation : entry.getValue()) {
649                        info.track(operation);
650                    }
651                    infos.add(info);
652                }
653            }
654        }
655        synchronized (preparedTransactions) {
656            if (!preparedTransactions.isEmpty()) {
657                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
658                    TranInfo info = new TranInfo();
659                    info.id = entry.getKey();
660                    for (Operation operation : entry.getValue()) {
661                        info.track(operation);
662                    }
663                    infos.add(info);
664                }
665            }
666        }
667        return infos.toString();
668    }
669
670    /**
671     * Move all the messages that were in the journal into long term storage. We
672     * just replay and do a checkpoint.
673     *
674     * @throws IOException
675     * @throws IOException
676     * @throws IllegalStateException
677     */
678    private void recover() throws IllegalStateException, IOException {
679        this.indexLock.writeLock().lock();
680        try {
681
682            long start = System.currentTimeMillis();
683            boolean requiresJournalReplay = recoverProducerAudit();
684            requiresJournalReplay |= recoverAckMessageFileMap();
685            Location lastIndoubtPosition = getRecoveryPosition();
686            Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
687            if (recoveryPosition != null) {
688                int redoCounter = 0;
689                int dataFileRotationTracker = recoveryPosition.getDataFileId();
690                LOG.info("Recovering from the journal @" + recoveryPosition);
691                while (recoveryPosition != null) {
692                    try {
693                        JournalCommand<?> message = load(recoveryPosition);
694                        metadata.lastUpdate = recoveryPosition;
695                        process(message, recoveryPosition, lastIndoubtPosition);
696                        redoCounter++;
697                    } catch (IOException failedRecovery) {
698                        if (isIgnoreMissingJournalfiles()) {
699                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
700                            // track this dud location
701                            journal.corruptRecoveryLocation(recoveryPosition);
702                        } else {
703                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
704                        }
705                    }
706                    recoveryPosition = journal.getNextLocation(recoveryPosition);
707                    // hold on to the minimum number of open files during recovery
708                    if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
709                        dataFileRotationTracker = recoveryPosition.getDataFileId();
710                        journal.cleanup();
711                    }
712                    if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
713                        LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
714                    }
715                }
716                if (LOG.isInfoEnabled()) {
717                    long end = System.currentTimeMillis();
718                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
719                }
720            }
721
722            // We may have to undo some index updates.
723            pageFile.tx().execute(new Transaction.Closure<IOException>() {
724                @Override
725                public void execute(Transaction tx) throws IOException {
726                    recoverIndex(tx);
727                }
728            });
729
730            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
731            Set<TransactionId> toRollback = new HashSet<>();
732            Set<TransactionId> toDiscard = new HashSet<>();
733            synchronized (inflightTransactions) {
734                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
735                    TransactionId id = it.next();
736                    if (id.isLocalTransaction()) {
737                        toRollback.add(id);
738                    } else {
739                        toDiscard.add(id);
740                    }
741                }
742                for (TransactionId tx: toRollback) {
743                    if (LOG.isDebugEnabled()) {
744                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
745                    }
746                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
747                }
748                for (TransactionId tx: toDiscard) {
749                    if (LOG.isDebugEnabled()) {
750                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
751                    }
752                    inflightTransactions.remove(tx);
753                }
754            }
755
756            synchronized (preparedTransactions) {
757                Set<TransactionId> txIds = new LinkedHashSet<TransactionId>(preparedTransactions.keySet());
758                for (TransactionId txId : txIds) {
759                    switch (purgeRecoveredXATransactionStrategy){
760                        case NEVER:
761                            LOG.warn("Recovered prepared XA TX: [{}]", txId);
762                            break;
763                        case COMMIT:
764                            store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
765                            LOG.warn("Recovered and Committing prepared XA TX: [{}]", txId);
766                            break;
767                        case ROLLBACK:
768                            store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
769                            LOG.warn("Recovered and Rolling Back prepared XA TX: [{}]", txId);
770                            break;
771                    }
772                }
773            }
774
775        } finally {
776            this.indexLock.writeLock().unlock();
777        }
778    }
779
780    @SuppressWarnings("unused")
781    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
782        return TransactionIdConversion.convertToLocal(tx);
783    }
784
785    private Location minimum(Location x,
786                             Location y) {
787        Location min = null;
788        if (x != null) {
789            min = x;
790            if (y != null) {
791                int compare = y.compareTo(x);
792                if (compare < 0) {
793                    min = y;
794                }
795            }
796        } else {
797            min = y;
798        }
799        return min;
800    }
801
802    private boolean recoverProducerAudit() throws IOException {
803        boolean requiresReplay = true;
804        if (metadata.producerSequenceIdTrackerLocation != null) {
805            try {
806                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
807                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
808                int maxNumProducers = getMaxFailoverProducersToTrack();
809                int maxAuditDepth = getFailoverProducersAuditDepth();
810                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
811                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
812                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
813                requiresReplay = false;
814            } catch (Exception e) {
815                LOG.warn("Cannot recover message audit", e);
816            }
817        }
818        // got no audit stored so got to recreate via replay from start of the journal
819        return requiresReplay;
820    }
821
822    @SuppressWarnings("unchecked")
823    private boolean recoverAckMessageFileMap() throws IOException {
824        boolean requiresReplay = true;
825        if (metadata.ackMessageFileMapLocation != null) {
826            try {
827                KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
828                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
829                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
830                metadata.ackMessageFileMapDirtyFlag.lazySet(true);
831                requiresReplay = false;
832            } catch (Exception e) {
833                LOG.warn("Cannot recover ackMessageFileMap", e);
834            }
835        }
836        // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
837        return requiresReplay;
838    }
839
840    protected void recoverIndex(Transaction tx) throws IOException {
841        long start = System.currentTimeMillis();
842        // It is possible index updates got applied before the journal updates..
843        // in that case we need to removed references to messages that are not in the journal
844        final Location lastAppendLocation = journal.getLastAppendLocation();
845        long undoCounter=0;
846
847        // Go through all the destinations to see if they have messages past the lastAppendLocation
848        for (String key : storedDestinations.keySet()) {
849            StoredDestination sd = storedDestinations.get(key);
850
851            final ArrayList<Long> matches = new ArrayList<>();
852            // Find all the Locations that are >= than the last Append Location.
853            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
854                @Override
855                protected void matched(Location key, Long value) {
856                    matches.add(value);
857                }
858            });
859
860            for (Long sequenceId : matches) {
861                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
862                if (keys != null) {
863                    sd.locationIndex.remove(tx, keys.location);
864                    sd.messageIdIndex.remove(tx, keys.messageId);
865                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
866                    undoCounter++;
867                    decrementAndSubSizeToStoreStat(key, keys.location.getSize());
868                    // TODO: do we need to modify the ack positions for the pub sub case?
869                }
870            }
871        }
872
873        if (undoCounter > 0) {
874            // The rolledback operations are basically in flight journal writes.  To avoid getting
875            // these the end user should do sync writes to the journal.
876            if (LOG.isInfoEnabled()) {
877                long end = System.currentTimeMillis();
878                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
879            }
880        }
881
882        undoCounter = 0;
883        start = System.currentTimeMillis();
884
885        // Lets be extra paranoid here and verify that all the datafiles being referenced
886        // by the indexes still exists.
887
888        final SequenceSet ss = new SequenceSet();
889        for (StoredDestination sd : storedDestinations.values()) {
890            // Use a visitor to cut down the number of pages that we load
891            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
892                int last=-1;
893
894                @Override
895                public boolean isInterestedInKeysBetween(Location first, Location second) {
896                    if( first==null ) {
897                        return !ss.contains(0, second.getDataFileId());
898                    } else if( second==null ) {
899                        return true;
900                    } else {
901                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
902                    }
903                }
904
905                @Override
906                public void visit(List<Location> keys, List<Long> values) {
907                    for (Location l : keys) {
908                        int fileId = l.getDataFileId();
909                        if( last != fileId ) {
910                            ss.add(fileId);
911                            last = fileId;
912                        }
913                    }
914                }
915
916            });
917        }
918        HashSet<Integer> missingJournalFiles = new HashSet<>();
919        while (!ss.isEmpty()) {
920            missingJournalFiles.add((int) ss.removeFirst());
921        }
922
923        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
924            missingJournalFiles.add(entry.getKey());
925            for (Integer i : entry.getValue()) {
926                missingJournalFiles.add(i);
927            }
928        }
929
930        missingJournalFiles.removeAll(journal.getFileMap().keySet());
931
932        if (!missingJournalFiles.isEmpty()) {
933            LOG.warn("Some journal files are missing: " + missingJournalFiles);
934        }
935
936        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<>();
937        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<>();
938        for (Integer missing : missingJournalFiles) {
939            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
940        }
941
942        if (checkForCorruptJournalFiles) {
943            Collection<DataFile> dataFiles = journal.getFileMap().values();
944            for (DataFile dataFile : dataFiles) {
945                int id = dataFile.getDataFileId();
946                // eof to next file id
947                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
948                Sequence seq = dataFile.getCorruptedBlocks().getHead();
949                while (seq != null) {
950                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
951                        new BTreeVisitor.BetweenVisitor<>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
952                    missingPredicates.add(visitor);
953                    knownCorruption.add(visitor);
954                    seq = seq.getNext();
955                }
956            }
957        }
958
959        if (!missingPredicates.isEmpty()) {
960            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
961                final StoredDestination sd = sdEntry.getValue();
962                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<>();
963                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
964                    @Override
965                    protected void matched(Location key, Long value) {
966                        matches.put(value, key);
967                    }
968                });
969
970                // If some message references are affected by the missing data files...
971                if (!matches.isEmpty()) {
972
973                    // We either 'gracefully' recover dropping the missing messages or
974                    // we error out.
975                    if( ignoreMissingJournalfiles ) {
976                        // Update the index to remove the references to the missing data
977                        for (Long sequenceId : matches.keySet()) {
978                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
979                            sd.locationIndex.remove(tx, keys.location);
980                            sd.messageIdIndex.remove(tx, keys.messageId);
981                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
982                            undoCounter++;
983                            decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
984                            // TODO: do we need to modify the ack positions for the pub sub case?
985                        }
986                    } else {
987                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
988                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
989                    }
990                }
991            }
992        }
993
994        if (!ignoreMissingJournalfiles) {
995            if (!knownCorruption.isEmpty()) {
996                LOG.error("Detected corrupt journal files. " + knownCorruption);
997                throw new IOException("Detected corrupt journal files. " + knownCorruption);
998            }
999
1000            if (!missingJournalFiles.isEmpty()) {
1001                LOG.error("Detected missing journal files. " + missingJournalFiles);
1002                throw new IOException("Detected missing journal files. " + missingJournalFiles);
1003            }
1004        }
1005
1006        if (undoCounter > 0) {
1007            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
1008            // should do sync writes to the journal.
1009            if (LOG.isInfoEnabled()) {
1010                long end = System.currentTimeMillis();
1011                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
1012            }
1013        }
1014    }
1015
1016    private Location nextRecoveryPosition;
1017    private Location lastRecoveryPosition;
1018
1019    public void incrementalRecover() throws IOException {
1020        this.indexLock.writeLock().lock();
1021        try {
1022            if( nextRecoveryPosition == null ) {
1023                if( lastRecoveryPosition==null ) {
1024                    nextRecoveryPosition = getRecoveryPosition();
1025                } else {
1026                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1027                }
1028            }
1029            while (nextRecoveryPosition != null) {
1030                lastRecoveryPosition = nextRecoveryPosition;
1031                metadata.lastUpdate = lastRecoveryPosition;
1032                JournalCommand<?> message = load(lastRecoveryPosition);
1033                process(message, lastRecoveryPosition, (IndexAware) null);
1034                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1035            }
1036        } finally {
1037            this.indexLock.writeLock().unlock();
1038        }
1039    }
1040
1041    public Location getLastUpdatePosition() throws IOException {
1042        return metadata.lastUpdate;
1043    }
1044
1045    private Location getRecoveryPosition() throws IOException {
1046
1047        if (!this.forceRecoverIndex) {
1048
1049            // If we need to recover the transactions..
1050            if (metadata.firstInProgressTransactionLocation != null) {
1051                return metadata.firstInProgressTransactionLocation;
1052            }
1053
1054            // Perhaps there were no transactions...
1055            if( metadata.lastUpdate!=null) {
1056                // Start replay at the record after the last one recorded in the index file.
1057                return getNextInitializedLocation(metadata.lastUpdate);
1058            }
1059        }
1060        // This loads the first position.
1061        return journal.getNextLocation(null);
1062    }
1063
1064    private Location getNextInitializedLocation(Location location) throws IOException {
1065        Location mayNotBeInitialized = journal.getNextLocation(location);
1066        if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) {
1067            // need to init size and type to skip
1068            return journal.getNextLocation(mayNotBeInitialized);
1069        } else {
1070            return mayNotBeInitialized;
1071        }
1072    }
1073
1074    protected void checkpointCleanup(final boolean cleanup) throws IOException {
1075        long start;
1076        this.indexLock.writeLock().lock();
1077        try {
1078            start = System.currentTimeMillis();
1079            if( !opened.get() ) {
1080                return;
1081            }
1082        } finally {
1083            this.indexLock.writeLock().unlock();
1084        }
1085        checkpointUpdate(cleanup);
1086        long end = System.currentTimeMillis();
1087        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1088            if (LOG.isInfoEnabled()) {
1089                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
1090            }
1091        }
1092    }
1093
1094    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
1095        int size = data.serializedSizeFramed();
1096        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
1097        os.writeByte(data.type().getNumber());
1098        data.writeFramed(os);
1099        return os.toByteSequence();
1100    }
1101
1102    // /////////////////////////////////////////////////////////////////
1103    // Methods call by the broker to update and query the store.
1104    // /////////////////////////////////////////////////////////////////
1105    public Location store(JournalCommand<?> data) throws IOException {
1106        return store(data, false, null,null);
1107    }
1108
1109    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1110        return store(data, false, null, null, onJournalStoreComplete);
1111    }
1112
1113    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1114        return store(data, sync, before, after, null);
1115    }
1116
1117    /**
1118     * All updated are are funneled through this method. The updates are converted
1119     * to a JournalMessage which is logged to the journal and then the data from
1120     * the JournalMessage is used to update the index just like it would be done
1121     * during a recovery process.
1122     */
1123    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1124        try {
1125            ByteSequence sequence = toByteSequence(data);
1126            Location location;
1127
1128            checkpointLock.readLock().lock();
1129            try {
1130
1131                long start = System.currentTimeMillis();
1132                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
1133                long start2 = System.currentTimeMillis();
1134                //Track the last async update so we know if we need to sync at the next checkpoint
1135                if (!sync && journal.isJournalDiskSyncPeriodic()) {
1136                    lastAsyncJournalUpdate.set(location);
1137                }
1138                process(data, location, before);
1139
1140                long end = System.currentTimeMillis();
1141                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1142                    if (LOG.isInfoEnabled()) {
1143                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1144                    }
1145                }
1146
1147                persistenceAdapterStatistics.addWriteTime(end - start);
1148
1149            } finally {
1150                checkpointLock.readLock().unlock();
1151            }
1152
1153            if (after != null) {
1154                after.run();
1155            }
1156
1157            if (scheduler == null && opened.get()) {
1158                startCheckpoint();
1159            }
1160            return location;
1161        } catch (IOException ioe) {
1162            LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe);
1163            brokerService.handleIOException(ioe);
1164            throw ioe;
1165        }
1166    }
1167
1168    /**
1169     * Loads a previously stored JournalMessage
1170     *
1171     * @param location
1172     * @return
1173     * @throws IOException
1174     */
1175    public JournalCommand<?> load(Location location) throws IOException {
1176        long start = System.currentTimeMillis();
1177        ByteSequence data = journal.read(location);
1178        long end = System.currentTimeMillis();
1179        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1180            if (LOG.isInfoEnabled()) {
1181                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1182            }
1183        }
1184
1185        persistenceAdapterStatistics.addReadTime(end - start);
1186
1187        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1188        byte readByte = is.readByte();
1189        KahaEntryType type = KahaEntryType.valueOf(readByte);
1190        if( type == null ) {
1191            try {
1192                is.close();
1193            } catch (IOException e) {}
1194            throw new IOException("Could not load journal record, null type information from: " + readByte + " at location: "+location);
1195        }
1196        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1197        message.mergeFramed(is);
1198        return message;
1199    }
1200
1201    /**
1202     * do minimal recovery till we reach the last inDoubtLocation
1203     * @param data
1204     * @param location
1205     * @param inDoubtlocation
1206     * @throws IOException
1207     */
1208    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1209        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1210            process(data, location, (IndexAware) null);
1211        } else {
1212            // just recover producer audit
1213            data.visit(new Visitor() {
1214                @Override
1215                public void visit(KahaAddMessageCommand command) throws IOException {
1216                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1217                }
1218            });
1219        }
1220    }
1221
1222    // /////////////////////////////////////////////////////////////////
1223    // Journaled record processing methods. Once the record is journaled,
1224    // these methods handle applying the index updates. These may be called
1225    // from the recovery method too so they need to be idempotent
1226    // /////////////////////////////////////////////////////////////////
1227
1228    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1229        data.visit(new Visitor() {
1230            @Override
1231            public void visit(KahaAddMessageCommand command) throws IOException {
1232                process(command, location, onSequenceAssignedCallback);
1233            }
1234
1235            @Override
1236            public void visit(KahaRemoveMessageCommand command) throws IOException {
1237                process(command, location);
1238            }
1239
1240            @Override
1241            public void visit(KahaPrepareCommand command) throws IOException {
1242                process(command, location);
1243            }
1244
1245            @Override
1246            public void visit(KahaCommitCommand command) throws IOException {
1247                process(command, location, onSequenceAssignedCallback);
1248            }
1249
1250            @Override
1251            public void visit(KahaRollbackCommand command) throws IOException {
1252                process(command, location);
1253            }
1254
1255            @Override
1256            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1257                process(command, location);
1258            }
1259
1260            @Override
1261            public void visit(KahaSubscriptionCommand command) throws IOException {
1262                process(command, location);
1263            }
1264
1265            @Override
1266            public void visit(KahaProducerAuditCommand command) throws IOException {
1267                processLocation(location);
1268            }
1269
1270            @Override
1271            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1272                processLocation(location);
1273            }
1274
1275            @Override
1276            public void visit(KahaTraceCommand command) {
1277                processLocation(location);
1278            }
1279
1280            @Override
1281            public void visit(KahaUpdateMessageCommand command) throws IOException {
1282                process(command, location);
1283            }
1284
1285            @Override
1286            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
1287                process(command, location);
1288            }
1289        });
1290    }
1291
1292    @SuppressWarnings("rawtypes")
1293    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1294        if (command.hasTransactionInfo()) {
1295            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1296            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1297        } else {
1298            this.indexLock.writeLock().lock();
1299            try {
1300                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1301                    @Override
1302                    public void execute(Transaction tx) throws IOException {
1303                        long assignedIndex = updateIndex(tx, command, location);
1304                        if (runWithIndexLock != null) {
1305                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1306                        }
1307                    }
1308                });
1309
1310            } finally {
1311                this.indexLock.writeLock().unlock();
1312            }
1313        }
1314    }
1315
1316    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1317        this.indexLock.writeLock().lock();
1318        try {
1319            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1320                @Override
1321                public void execute(Transaction tx) throws IOException {
1322                    updateIndex(tx, command, location);
1323                }
1324            });
1325        } finally {
1326            this.indexLock.writeLock().unlock();
1327        }
1328    }
1329
1330    @SuppressWarnings("rawtypes")
1331    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1332        if (command.hasTransactionInfo()) {
1333           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1334           inflightTx.add(new RemoveOperation(command, location));
1335        } else {
1336            this.indexLock.writeLock().lock();
1337            try {
1338                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1339                    @Override
1340                    public void execute(Transaction tx) throws IOException {
1341                        updateIndex(tx, command, location);
1342                    }
1343                });
1344            } finally {
1345                this.indexLock.writeLock().unlock();
1346            }
1347        }
1348    }
1349
1350    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1351        this.indexLock.writeLock().lock();
1352        try {
1353            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1354                @Override
1355                public void execute(Transaction tx) throws IOException {
1356                    updateIndex(tx, command, location);
1357                }
1358            });
1359        } finally {
1360            this.indexLock.writeLock().unlock();
1361        }
1362    }
1363
1364    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1365        this.indexLock.writeLock().lock();
1366        try {
1367            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1368                @Override
1369                public void execute(Transaction tx) throws IOException {
1370                    updateIndex(tx, command, location);
1371                }
1372            });
1373        } finally {
1374            this.indexLock.writeLock().unlock();
1375        }
1376    }
1377
1378    protected void processLocation(final Location location) {
1379        this.indexLock.writeLock().lock();
1380        try {
1381            metadata.lastUpdate = location;
1382        } finally {
1383            this.indexLock.writeLock().unlock();
1384        }
1385    }
1386
1387    @SuppressWarnings("rawtypes")
1388    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1389        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1390        List<Operation> inflightTx;
1391        synchronized (inflightTransactions) {
1392            inflightTx = inflightTransactions.remove(key);
1393            if (inflightTx == null) {
1394                inflightTx = preparedTransactions.remove(key);
1395            }
1396        }
1397        if (inflightTx == null) {
1398            // only non persistent messages in this tx
1399            if (before != null) {
1400                before.sequenceAssignedWithIndexLocked(-1);
1401            }
1402            return;
1403        }
1404
1405        final List<Operation> messagingTx = inflightTx;
1406        indexLock.writeLock().lock();
1407        try {
1408            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1409                @Override
1410                public void execute(Transaction tx) throws IOException {
1411                    for (Operation op : messagingTx) {
1412                        op.execute(tx);
1413                        recordAckMessageReferenceLocation(location, op.getLocation());
1414                    }
1415                }
1416            });
1417            metadata.lastUpdate = location;
1418        } finally {
1419            indexLock.writeLock().unlock();
1420        }
1421    }
1422
1423    @SuppressWarnings("rawtypes")
1424    protected void process(KahaPrepareCommand command, Location location) {
1425        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1426        List<Operation> tx = null;
1427        synchronized (inflightTransactions) {
1428            tx = inflightTransactions.remove(key);
1429            if (tx != null) {
1430                preparedTransactions.put(key, tx);
1431            }
1432        }
1433        if (tx != null && !tx.isEmpty()) {
1434            indexLock.writeLock().lock();
1435            try {
1436                for (Operation op : tx) {
1437                    recordAckMessageReferenceLocation(location, op.getLocation());
1438                }
1439            } finally {
1440                indexLock.writeLock().unlock();
1441            }
1442        }
1443    }
1444
1445    @SuppressWarnings("rawtypes")
1446    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1447        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1448        List<Operation> updates = null;
1449        synchronized (inflightTransactions) {
1450            updates = inflightTransactions.remove(key);
1451            if (updates == null) {
1452                updates = preparedTransactions.remove(key);
1453            }
1454        }
1455        if (key.isXATransaction() && updates != null && !updates.isEmpty()) {
1456            indexLock.writeLock().lock();
1457            try {
1458                for (Operation op : updates) {
1459                    recordAckMessageReferenceLocation(location, op.getLocation());
1460                }
1461            } finally {
1462                indexLock.writeLock().unlock();
1463            }
1464        }
1465    }
1466
1467    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
1468        final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
1469
1470        // Mark the current journal file as a compacted file so that gc checks can skip
1471        // over logs that are smaller compaction type logs.
1472        DataFile current = journal.getDataFileById(location.getDataFileId());
1473        current.setTypeCode(command.getRewriteType());
1474
1475        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
1476            // Move offset so that next location read jumps to next file.
1477            location.setOffset(journalMaxFileLength);
1478        }
1479    }
1480
1481    // /////////////////////////////////////////////////////////////////
1482    // These methods do the actual index updates.
1483    // /////////////////////////////////////////////////////////////////
1484
1485    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1486    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<>();
1487
1488    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1489        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1490
1491        // Skip adding the message to the index if this is a topic and there are
1492        // no subscriptions.
1493        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1494            return -1;
1495        }
1496
1497        // Add the message.
1498        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1499        long id = sd.orderIndex.getNextMessageId();
1500        Long previous = sd.locationIndex.put(tx, location, id);
1501        if (previous == null) {
1502            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1503            if (previous == null) {
1504                incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1505                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1506                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1507                    addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
1508                }
1509                metadata.lastUpdate = location;
1510            } else {
1511
1512                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1513                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1514                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1515                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1516                }
1517                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1518                sd.locationIndex.remove(tx, location);
1519                id = -1;
1520            }
1521        } else {
1522            // restore the previous value.. Looks like this was a redo of a previously
1523            // added message. We don't want to assign it a new id as the other indexes would
1524            // be wrong..
1525            sd.locationIndex.put(tx, location, previous);
1526            // ensure sequence is not broken
1527            sd.orderIndex.revertNextMessageId();
1528            metadata.lastUpdate = location;
1529        }
1530        // record this id in any event, initial send or recovery
1531        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1532
1533       return id;
1534    }
1535
1536    void trackPendingAdd(KahaDestination destination, Long seq) {
1537        StoredDestination sd = storedDestinations.get(key(destination));
1538        if (sd != null) {
1539            sd.trackPendingAdd(seq);
1540        }
1541    }
1542
1543    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1544        StoredDestination sd = storedDestinations.get(key(destination));
1545        if (sd != null) {
1546            sd.trackPendingAddComplete(seq);
1547        }
1548    }
1549
1550    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1551        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1552        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1553
1554        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1555        if (id != null) {
1556            MessageKeys previousKeys = sd.orderIndex.put(
1557                    tx,
1558                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1559                    id,
1560                    new MessageKeys(command.getMessageId(), location)
1561            );
1562            sd.locationIndex.put(tx, location, id);
1563            incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1564
1565            if (previousKeys != null) {
1566                //Remove the existing from the size
1567                decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
1568
1569                //update all the subscription metrics
1570                if (enableSubscriptionStatistics && sd.ackPositions != null && location.getSize() != previousKeys.location.getSize()) {
1571                    Iterator<Entry<String, SequenceSet>> iter = sd.ackPositions.iterator(tx);
1572                    while (iter.hasNext()) {
1573                        Entry<String, SequenceSet> e = iter.next();
1574                        if (e.getValue().contains(id)) {
1575                            incrementAndAddSizeToStoreStat(key(command.getDestination()), e.getKey(), location.getSize());
1576                            decrementAndSubSizeToStoreStat(key(command.getDestination()), e.getKey(), previousKeys.location.getSize());
1577                        }
1578                    }
1579                }
1580
1581                // on first update previous is original location, on recovery/replay it may be the updated location
1582                if(!previousKeys.location.equals(location)) {
1583                    sd.locationIndex.remove(tx, previousKeys.location);
1584                }
1585            }
1586            metadata.lastUpdate = location;
1587        } else {
1588            //Add the message if it can't be found
1589            this.updateIndex(tx, command, location);
1590        }
1591    }
1592
1593    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1594        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1595        if (!command.hasSubscriptionKey()) {
1596
1597            // In the queue case we just remove the message from the index..
1598            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1599            if (sequenceId != null) {
1600                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1601                if (keys != null) {
1602                    sd.locationIndex.remove(tx, keys.location);
1603                    decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
1604                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1605                    metadata.lastUpdate = ackLocation;
1606                }  else if (LOG.isDebugEnabled()) {
1607                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1608                }
1609            } else if (LOG.isDebugEnabled()) {
1610                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1611            }
1612        } else {
1613            // In the topic case we need remove the message once it's been acked
1614            // by all the subs
1615            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1616
1617            // Make sure it's a valid message id...
1618            if (sequence != null) {
1619                String subscriptionKey = command.getSubscriptionKey();
1620                if (command.getAck() != UNMATCHED) {
1621                    sd.orderIndex.get(tx, sequence);
1622                    byte priority = sd.orderIndex.lastGetPriority();
1623                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1624                }
1625
1626                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1627                if (keys != null) {
1628                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1629                }
1630                // The following method handles deleting un-referenced messages.
1631                removeAckLocation(command, tx, sd, subscriptionKey, sequence);
1632                metadata.lastUpdate = ackLocation;
1633            } else if (LOG.isDebugEnabled()) {
1634                LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1635            }
1636
1637        }
1638    }
1639
1640    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1641        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1642        if (referenceFileIds == null) {
1643            referenceFileIds = new HashSet<>();
1644            referenceFileIds.add(messageLocation.getDataFileId());
1645            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1646            metadata.ackMessageFileMapDirtyFlag.lazySet(true);
1647
1648        } else {
1649            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1650            if (!referenceFileIds.contains(id)) {
1651                referenceFileIds.add(id);
1652            }
1653        }
1654    }
1655
1656    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1657        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1658        sd.orderIndex.remove(tx);
1659
1660        sd.locationIndex.clear(tx);
1661        sd.locationIndex.unload(tx);
1662        tx.free(sd.locationIndex.getPageId());
1663
1664        sd.messageIdIndex.clear(tx);
1665        sd.messageIdIndex.unload(tx);
1666        tx.free(sd.messageIdIndex.getPageId());
1667
1668        if (sd.subscriptions != null) {
1669            sd.subscriptions.clear(tx);
1670            sd.subscriptions.unload(tx);
1671            tx.free(sd.subscriptions.getPageId());
1672
1673            sd.subscriptionAcks.clear(tx);
1674            sd.subscriptionAcks.unload(tx);
1675            tx.free(sd.subscriptionAcks.getPageId());
1676
1677            sd.ackPositions.clear(tx);
1678            sd.ackPositions.unload(tx);
1679            tx.free(sd.ackPositions.getHeadPageId());
1680
1681            sd.subLocations.clear(tx);
1682            sd.subLocations.unload(tx);
1683            tx.free(sd.subLocations.getHeadPageId());
1684        }
1685
1686        String key = key(command.getDestination());
1687        storedDestinations.remove(key);
1688        metadata.destinations.remove(tx, key);
1689        clearStoreStats(command.getDestination());
1690        storeCache.remove(key(command.getDestination()));
1691    }
1692
1693    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1694        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1695        final String subscriptionKey = command.getSubscriptionKey();
1696
1697        // If set then we are creating it.. otherwise we are destroying the sub
1698        if (command.hasSubscriptionInfo()) {
1699            Location existing = sd.subLocations.get(tx, subscriptionKey);
1700            if (existing != null && existing.compareTo(location) == 0) {
1701                // replay on recovery, ignore
1702                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1703                return;
1704            }
1705
1706            sd.subscriptions.put(tx, subscriptionKey, command);
1707            sd.subLocations.put(tx, subscriptionKey, location);
1708            long ackLocation=NOT_ACKED;
1709            if (!command.getRetroactive()) {
1710                ackLocation = sd.orderIndex.nextMessageId-1;
1711            } else {
1712                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1713            }
1714            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1715            sd.subscriptionCache.add(subscriptionKey);
1716        } else {
1717            // delete the sub...
1718            sd.subscriptions.remove(tx, subscriptionKey);
1719            sd.subLocations.remove(tx, subscriptionKey);
1720            sd.subscriptionAcks.remove(tx, subscriptionKey);
1721            sd.subscriptionCache.remove(subscriptionKey);
1722            removeAckLocationsForSub(command, tx, sd, subscriptionKey);
1723            MessageStoreSubscriptionStatistics subStats = getSubStats(key(command.getDestination()));
1724            if (subStats != null) {
1725                subStats.removeSubscription(subscriptionKey);
1726            }
1727
1728            if (sd.subscriptions.isEmpty(tx)) {
1729                // remove the stored destination
1730                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1731                removeDestinationCommand.setDestination(command.getDestination());
1732                updateIndex(tx, removeDestinationCommand, null);
1733                clearStoreStats(command.getDestination());
1734            }
1735        }
1736    }
1737
1738    private void checkpointUpdate(final boolean cleanup) throws IOException {
1739        checkpointLock.writeLock().lock();
1740        try {
1741            this.indexLock.writeLock().lock();
1742            try {
1743                Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() {
1744                    @Override
1745                    public Set<Integer> execute(Transaction tx) throws IOException {
1746                        return checkpointUpdate(tx, cleanup);
1747                    }
1748                });
1749                pageFile.flush();
1750                // after the index update such that partial removal does not leave dangling references in the index.
1751                journal.removeDataFiles(filesToGc);
1752            } finally {
1753                this.indexLock.writeLock().unlock();
1754            }
1755
1756        } finally {
1757            checkpointLock.writeLock().unlock();
1758        }
1759    }
1760
1761    /**
1762     * @param tx
1763     * @throws IOException
1764     */
1765    Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1766        MDC.put("activemq.persistenceDir", getDirectory().getName());
1767        LOG.debug("Checkpoint started.");
1768
1769        // reflect last update exclusive of current checkpoint
1770        Location lastUpdate = metadata.lastUpdate;
1771
1772        metadata.state = OPEN_STATE;
1773        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1774        if (metadata.ackMessageFileMapDirtyFlag.get() || (metadata.ackMessageFileMapLocation == null)) {
1775            metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1776        }
1777        metadata.ackMessageFileMapDirtyFlag.lazySet(false);
1778        Location[] inProgressTxRange = getInProgressTxLocationRange();
1779        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1780        tx.store(metadata.page, metadataMarshaller, true);
1781
1782        final TreeSet<Integer> gcCandidateSet = new TreeSet<>();
1783        if (cleanup) {
1784
1785            final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
1786            gcCandidateSet.addAll(completeFileSet);
1787
1788            if (LOG.isTraceEnabled()) {
1789                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1790            }
1791
1792            if (lastUpdate != null) {
1793                // we won't delete past the last update, ackCompaction journal can be a candidate in error
1794                gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId())));
1795            }
1796
1797            // Don't GC files under replication
1798            if( journalFilesBeingReplicated!=null ) {
1799                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1800            }
1801
1802            if (metadata.producerSequenceIdTrackerLocation != null) {
1803                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1804                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1805                    // rewrite so we don't prevent gc
1806                    metadata.producerSequenceIdTracker.setModified(true);
1807                    if (LOG.isTraceEnabled()) {
1808                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1809                    }
1810                }
1811                gcCandidateSet.remove(dataFileId);
1812                if (LOG.isTraceEnabled()) {
1813                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet);
1814                }
1815            }
1816
1817            if (metadata.ackMessageFileMapLocation != null) {
1818                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1819                gcCandidateSet.remove(dataFileId);
1820                if (LOG.isTraceEnabled()) {
1821                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet);
1822                }
1823            }
1824
1825            // Don't GC files referenced by in-progress tx
1826            if (inProgressTxRange[0] != null) {
1827                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1828                    gcCandidateSet.remove(pendingTx);
1829                }
1830            }
1831            if (LOG.isTraceEnabled()) {
1832                LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1833            }
1834
1835            // Go through all the destinations to see if any of them can remove GC candidates.
1836            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1837                if( gcCandidateSet.isEmpty() ) {
1838                    break;
1839                }
1840
1841                // Use a visitor to cut down the number of pages that we load
1842                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1843                    int last=-1;
1844                    @Override
1845                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1846                        if( first==null ) {
1847                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1848                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1849                                subset.remove(second.getDataFileId());
1850                            }
1851                            return !subset.isEmpty();
1852                        } else if( second==null ) {
1853                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1854                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1855                                subset.remove(first.getDataFileId());
1856                            }
1857                            return !subset.isEmpty();
1858                        } else {
1859                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1860                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1861                                subset.remove(first.getDataFileId());
1862                            }
1863                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1864                                subset.remove(second.getDataFileId());
1865                            }
1866                            return !subset.isEmpty();
1867                        }
1868                    }
1869
1870                    @Override
1871                    public void visit(List<Location> keys, List<Long> values) {
1872                        for (Location l : keys) {
1873                            int fileId = l.getDataFileId();
1874                            if( last != fileId ) {
1875                                gcCandidateSet.remove(fileId);
1876                                last = fileId;
1877                            }
1878                        }
1879                    }
1880                });
1881
1882                // Durable Subscription
1883                if (entry.getValue().subLocations != null) {
1884                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1885                    while (iter.hasNext()) {
1886                        Entry<String, Location> subscription = iter.next();
1887                        int dataFileId = subscription.getValue().getDataFileId();
1888
1889                        // Move subscription along if it has no outstanding messages that need ack'd
1890                        // and its in the last log file in the journal.
1891                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1892                            final StoredDestination destination = entry.getValue();
1893                            final String subscriptionKey = subscription.getKey();
1894                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1895
1896                            // When pending is size one that is the next message Id meaning there
1897                            // are no pending messages currently.
1898                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1899                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1900
1901                                if (LOG.isTraceEnabled()) {
1902                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
1903                                }
1904
1905                                final KahaSubscriptionCommand kahaSub =
1906                                    destination.subscriptions.get(tx, subscriptionKey);
1907                                destination.subLocations.put(
1908                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1909
1910                                // Skips the remove from candidates if we rewrote the subscription
1911                                // in order to prevent duplicate subscription commands on recover.
1912                                // If another subscription is on the same file and isn't rewritten
1913                                // than it will remove the file from the set.
1914                                continue;
1915                            }
1916                        }
1917
1918                        gcCandidateSet.remove(dataFileId);
1919                    }
1920                }
1921
1922                if (LOG.isTraceEnabled()) {
1923                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1924                }
1925            }
1926
1927            // check we are not deleting file with ack for in-use journal files
1928            if (LOG.isTraceEnabled()) {
1929                LOG.trace("gc candidates: " + gcCandidateSet);
1930                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1931            }
1932
1933            boolean ackMessageFileMapMod = false;
1934            Iterator<Integer> candidates = gcCandidateSet.iterator();
1935            while (candidates.hasNext()) {
1936                Integer candidate = candidates.next();
1937                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1938                if (referencedFileIds != null) {
1939                    for (Integer referencedFileId : referencedFileIds) {
1940                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1941                            // active file that is not targeted for deletion is referenced so don't delete
1942                            candidates.remove();
1943                            break;
1944                        }
1945                    }
1946                    if (gcCandidateSet.contains(candidate)) {
1947                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1948                        metadata.ackMessageFileMapDirtyFlag.lazySet(true);
1949                    } else {
1950                        if (LOG.isTraceEnabled()) {
1951                            LOG.trace("not removing data file: " + candidate
1952                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1953                        }
1954                    }
1955                }
1956            }
1957
1958            if (!gcCandidateSet.isEmpty()) {
1959                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
1960                for (Integer candidate : gcCandidateSet) {
1961                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1962                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1963                        metadata.ackMessageFileMapDirtyFlag.lazySet(true);
1964                    }
1965                }
1966                if (ackMessageFileMapMod) {
1967                    checkpointUpdate(tx, false);
1968                }
1969            } else if (isEnableAckCompaction()) {
1970                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
1971                    // First check length of journal to make sure it makes sense to even try.
1972                    //
1973                    // If there is only one journal file with Acks in it we don't need to move
1974                    // it since it won't be chained to any later logs.
1975                    //
1976                    // If the logs haven't grown since the last time then we need to compact
1977                    // otherwise there seems to still be room for growth and we don't need to incur
1978                    // the overhead.  Depending on configuration this check can be avoided and
1979                    // Ack compaction will run any time the store has not GC'd a journal file in
1980                    // the configured amount of cycles.
1981                    if (metadata.ackMessageFileMap.size() > 1 &&
1982                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
1983
1984                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
1985                        try {
1986                            scheduler.execute(new AckCompactionRunner());
1987                        } catch (Exception ex) {
1988                            LOG.warn("Error on queueing the Ack Compactor", ex);
1989                        }
1990                    } else {
1991                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
1992                    }
1993
1994                    checkPointCyclesWithNoGC = 0;
1995                } else {
1996                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
1997                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
1998                }
1999
2000                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
2001            }
2002        }
2003        MDC.remove("activemq.persistenceDir");
2004
2005        LOG.debug("Checkpoint done.");
2006        return gcCandidateSet;
2007    }
2008
2009    private final class AckCompactionRunner implements Runnable {
2010
2011        @Override
2012        public void run() {
2013
2014            int journalToAdvance = -1;
2015            Set<Integer> journalLogsReferenced = new HashSet<>();
2016
2017            //flag to know whether the ack forwarding completed without an exception
2018            boolean forwarded = false;
2019
2020            try {
2021                //acquire the checkpoint lock to prevent other threads from
2022                //running a checkpoint while this is running
2023                //
2024                //Normally this task runs on the same executor as the checkpoint task
2025                //so this ack compaction runner wouldn't run at the same time as the checkpoint task.
2026                //
2027                //However, there are two cases where this isn't always true.
2028                //First, the checkpoint() method is public and can be called through the
2029                //PersistenceAdapter interface by someone at the same time this is running.
2030                //Second, a checkpoint is called during shutdown without using the executor.
2031                //
2032                //In the future it might be better to just remove the checkpointLock entirely
2033                //and only use the executor but this would need to be examined for any unintended
2034                //consequences
2035                checkpointLock.readLock().lock();
2036
2037                try {
2038
2039                    // Lock index to capture the ackMessageFileMap data
2040                    indexLock.writeLock().lock();
2041
2042                    // Map keys might not be sorted, find the earliest log file to forward acks
2043                    // from and move only those, future cycles can chip away at more as needed.
2044                    // We won't move files that are themselves rewritten on a previous compaction.
2045                    List<Integer> journalFileIds = new ArrayList<>(metadata.ackMessageFileMap.keySet());
2046                    Collections.sort(journalFileIds);
2047                    for (Integer journalFileId : journalFileIds) {
2048                        DataFile current = journal.getDataFileById(journalFileId);
2049                        if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
2050                            journalToAdvance = journalFileId;
2051                            break;
2052                        }
2053                    }
2054
2055                    // Check if we found one, or if we only found the current file being written to.
2056                    if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
2057                        return;
2058                    }
2059
2060                    journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
2061
2062                } finally {
2063                    indexLock.writeLock().unlock();
2064                }
2065
2066                try {
2067                    // Background rewrite of the old acks
2068                    forwardAllAcks(journalToAdvance, journalLogsReferenced);
2069                    forwarded = true;
2070                } catch (IOException ioe) {
2071                    LOG.error("Forwarding of acks failed", ioe);
2072                    brokerService.handleIOException(ioe);
2073                } catch (Throwable e) {
2074                    LOG.error("Forwarding of acks failed", e);
2075                    brokerService.handleIOException(IOExceptionSupport.create(e));
2076                }
2077            } finally {
2078                checkpointLock.readLock().unlock();
2079            }
2080
2081            try {
2082                if (forwarded) {
2083                    // Checkpoint with changes from the ackMessageFileMap
2084                    checkpointUpdate(false);
2085                }
2086            } catch (IOException ioe) {
2087                LOG.error("Checkpoint failed", ioe);
2088                brokerService.handleIOException(ioe);
2089            } catch (Throwable e) {
2090                LOG.error("Checkpoint failed", e);
2091                brokerService.handleIOException(IOExceptionSupport.create(e));
2092            }
2093        }
2094    }
2095
2096    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
2097        LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
2098
2099        DataFile forwardsFile = journal.reserveDataFile();
2100        forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
2101        LOG.trace("Reserved file for forwarded acks: {}", forwardsFile);
2102
2103        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<>();
2104
2105        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
2106            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
2107            compactionMarker.setSourceDataFileId(journalToRead);
2108            compactionMarker.setRewriteType(forwardsFile.getTypeCode());
2109
2110            ByteSequence payload = toByteSequence(compactionMarker);
2111            appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2112            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
2113
2114            final Location limit = new Location(journalToRead + 1, 0);
2115            Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit);
2116            while (nextLocation != null) {
2117                JournalCommand<?> command = null;
2118                try {
2119                    command = load(nextLocation);
2120                } catch (IOException ex) {
2121                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
2122                }
2123
2124                if (shouldForward(command)) {
2125                    payload = toByteSequence(command);
2126                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2127                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
2128                }
2129
2130                nextLocation = getNextLocationForAckForward(nextLocation, limit);
2131            }
2132        }
2133
2134        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
2135
2136        // Lock index while we update the ackMessageFileMap.
2137        indexLock.writeLock().lock();
2138
2139        // Update the ack map with the new locations of the acks
2140        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
2141            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
2142            if (referenceFileIds == null) {
2143                referenceFileIds = new HashSet<>();
2144                referenceFileIds.addAll(entry.getValue());
2145                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
2146                metadata.ackMessageFileMapDirtyFlag.lazySet(true);
2147            } else {
2148                referenceFileIds.addAll(entry.getValue());
2149            }
2150        }
2151
2152        // remove the old location data from the ack map so that the old journal log file can
2153        // be removed on next GC.
2154        metadata.ackMessageFileMap.remove(journalToRead);
2155        metadata.ackMessageFileMapDirtyFlag.lazySet(true);
2156
2157        indexLock.writeLock().unlock();
2158
2159        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
2160    }
2161
2162    private boolean shouldForward(JournalCommand<?> command) {
2163        boolean result = false;
2164        if (command != null) {
2165            if (command instanceof KahaRemoveMessageCommand) {
2166                result = true;
2167            } else if (command instanceof KahaCommitCommand) {
2168                KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command;
2169                if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) {
2170                    result = true;
2171                }
2172            }
2173        }
2174        return result;
2175    }
2176
2177    private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) {
2178        //getNextLocation() can throw an IOException, we should handle it and set
2179        //nextLocation to null and abort gracefully
2180        //Should not happen in the normal case
2181        Location location = null;
2182        try {
2183            location = journal.getNextLocation(nextLocation, limit);
2184        } catch (IOException e) {
2185            LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e);
2186            if (LOG.isDebugEnabled()) {
2187                LOG.debug("Failed to load next journal location after: {}", nextLocation, e);
2188            }
2189        }
2190        return location;
2191    }
2192
2193    final Runnable nullCompletionCallback = new Runnable() {
2194        @Override
2195        public void run() {
2196        }
2197    };
2198
2199    private Location checkpointProducerAudit() throws IOException {
2200        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
2201            ByteArrayOutputStream baos = new ByteArrayOutputStream();
2202            ObjectOutputStream oout = new ObjectOutputStream(baos);
2203            oout.writeObject(metadata.producerSequenceIdTracker);
2204            oout.flush();
2205            oout.close();
2206            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2207            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
2208            try {
2209                location.getLatch().await();
2210                if (location.getException().get() != null) {
2211                    throw location.getException().get();
2212                }
2213            } catch (InterruptedException e) {
2214                throw new InterruptedIOException(e.toString());
2215            }
2216            return location;
2217        }
2218        return metadata.producerSequenceIdTrackerLocation;
2219    }
2220
2221    private Location checkpointAckMessageFileMap() throws IOException {
2222        ByteArrayOutputStream baos = new ByteArrayOutputStream();
2223        ObjectOutputStream oout = new ObjectOutputStream(baos);
2224        oout.writeObject(metadata.ackMessageFileMap);
2225        oout.flush();
2226        oout.close();
2227        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2228        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
2229        try {
2230            location.getLatch().await();
2231        } catch (InterruptedException e) {
2232            throw new InterruptedIOException(e.toString());
2233        }
2234        return location;
2235    }
2236
2237    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
2238
2239        ByteSequence sequence = toByteSequence(subscription);
2240        Location location = journal.write(sequence, nullCompletionCallback) ;
2241
2242        try {
2243            location.getLatch().await();
2244        } catch (InterruptedException e) {
2245            throw new InterruptedIOException(e.toString());
2246        }
2247        return location;
2248    }
2249
2250    public HashSet<Integer> getJournalFilesBeingReplicated() {
2251        return journalFilesBeingReplicated;
2252    }
2253
2254    // /////////////////////////////////////////////////////////////////
2255    // StoredDestination related implementation methods.
2256    // /////////////////////////////////////////////////////////////////
2257
2258    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<>();
2259
2260    static class MessageKeys {
2261        final String messageId;
2262        final Location location;
2263
2264        public MessageKeys(String messageId, Location location) {
2265            this.messageId=messageId;
2266            this.location=location;
2267        }
2268
2269        @Override
2270        public String toString() {
2271            return "["+messageId+","+location+"]";
2272        }
2273    }
2274
2275    protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
2276        final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller();
2277
2278        @Override
2279        public MessageKeys readPayload(DataInput dataIn) throws IOException {
2280            return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn));
2281        }
2282
2283        @Override
2284        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
2285            dataOut.writeUTF(object.messageId);
2286            locationSizeMarshaller.writePayload(object.location, dataOut);
2287        }
2288    }
2289
2290    class LastAck {
2291        long lastAckedSequence;
2292        byte priority;
2293
2294        public LastAck(LastAck source) {
2295            this.lastAckedSequence = source.lastAckedSequence;
2296            this.priority = source.priority;
2297        }
2298
2299        public LastAck() {
2300            this.priority = MessageOrderIndex.HI;
2301        }
2302
2303        public LastAck(long ackLocation) {
2304            this.lastAckedSequence = ackLocation;
2305            this.priority = MessageOrderIndex.LO;
2306        }
2307
2308        public LastAck(long ackLocation, byte priority) {
2309            this.lastAckedSequence = ackLocation;
2310            this.priority = priority;
2311        }
2312
2313        @Override
2314        public String toString() {
2315            return "[" + lastAckedSequence + ":" + priority + "]";
2316        }
2317    }
2318
2319    protected class LastAckMarshaller implements Marshaller<LastAck> {
2320
2321        @Override
2322        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
2323            dataOut.writeLong(object.lastAckedSequence);
2324            dataOut.writeByte(object.priority);
2325        }
2326
2327        @Override
2328        public LastAck readPayload(DataInput dataIn) throws IOException {
2329            LastAck lastAcked = new LastAck();
2330            lastAcked.lastAckedSequence = dataIn.readLong();
2331            if (metadata.version >= 3) {
2332                lastAcked.priority = dataIn.readByte();
2333            }
2334            return lastAcked;
2335        }
2336
2337        @Override
2338        public int getFixedSize() {
2339            return 9;
2340        }
2341
2342        @Override
2343        public LastAck deepCopy(LastAck source) {
2344            return new LastAck(source);
2345        }
2346
2347        @Override
2348        public boolean isDeepCopySupported() {
2349            return true;
2350        }
2351    }
2352
2353    class StoredDestination {
2354
2355        MessageOrderIndex orderIndex = new MessageOrderIndex();
2356        BTreeIndex<Location, Long> locationIndex;
2357        BTreeIndex<String, Long> messageIdIndex;
2358
2359        // These bits are only set for Topics
2360        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
2361        BTreeIndex<String, LastAck> subscriptionAcks;
2362        HashMap<String, MessageOrderCursor> subscriptionCursors;
2363        ListIndex<String, SequenceSet> ackPositions;
2364        ListIndex<String, Location> subLocations;
2365
2366        // Transient data used to track which Messages are no longer needed.
2367        final HashSet<String> subscriptionCache = new LinkedHashSet<>();
2368
2369        public void trackPendingAdd(Long seq) {
2370            orderIndex.trackPendingAdd(seq);
2371        }
2372
2373        public void trackPendingAddComplete(Long seq) {
2374            orderIndex.trackPendingAddComplete(seq);
2375        }
2376
2377        @Override
2378        public String toString() {
2379            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
2380        }
2381    }
2382
2383    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
2384
2385        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
2386
2387        @Override
2388        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
2389            final StoredDestination value = new StoredDestination();
2390            value.orderIndex.defaultPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2391            value.locationIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2392            value.messageIdIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2393
2394            if (dataIn.readBoolean()) {
2395                value.subscriptions = new BTreeIndex<>(pageFile, dataIn.readLong());
2396                value.subscriptionAcks = new BTreeIndex<>(pageFile, dataIn.readLong());
2397                if (metadata.version >= 4) {
2398                    value.ackPositions = new ListIndex<>(pageFile, dataIn.readLong());
2399                } else {
2400                    // upgrade
2401                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2402                        @Override
2403                        public void execute(Transaction tx) throws IOException {
2404                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<>();
2405
2406                            if (metadata.version >= 3) {
2407                                // migrate
2408                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2409                                        new BTreeIndex<>(pageFile, dataIn.readLong());
2410                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2411                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2412                                oldAckPositions.load(tx);
2413
2414
2415                                // Do the initial build of the data in memory before writing into the store
2416                                // based Ack Positions List to avoid a lot of disk thrashing.
2417                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2418                                while (iterator.hasNext()) {
2419                                    Entry<Long, HashSet<String>> entry = iterator.next();
2420
2421                                    for(String subKey : entry.getValue()) {
2422                                        SequenceSet pendingAcks = temp.get(subKey);
2423                                        if (pendingAcks == null) {
2424                                            pendingAcks = new SequenceSet();
2425                                            temp.put(subKey, pendingAcks);
2426                                        }
2427
2428                                        pendingAcks.add(entry.getKey());
2429                                    }
2430                                }
2431                            }
2432                            // Now move the pending messages to ack data into the store backed
2433                            // structure.
2434                            value.ackPositions = new ListIndex<>(pageFile, tx.allocate());
2435                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2436                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2437                            value.ackPositions.load(tx);
2438                            for(String subscriptionKey : temp.keySet()) {
2439                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2440                            }
2441
2442                        }
2443                    });
2444                }
2445
2446                if (metadata.version >= 5) {
2447                    value.subLocations = new ListIndex<>(pageFile, dataIn.readLong());
2448                } else {
2449                    // upgrade
2450                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2451                        @Override
2452                        public void execute(Transaction tx) throws IOException {
2453                            value.subLocations = new ListIndex<>(pageFile, tx.allocate());
2454                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2455                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2456                            value.subLocations.load(tx);
2457                        }
2458                    });
2459                }
2460            }
2461            if (metadata.version >= 2) {
2462                value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2463                value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2464            } else {
2465                // upgrade
2466                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2467                    @Override
2468                    public void execute(Transaction tx) throws IOException {
2469                        value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
2470                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2471                        value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2472                        value.orderIndex.lowPriorityIndex.load(tx);
2473
2474                        value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
2475                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2476                        value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2477                        value.orderIndex.highPriorityIndex.load(tx);
2478                    }
2479                });
2480            }
2481
2482            return value;
2483        }
2484
2485        @Override
2486        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2487            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2488            dataOut.writeLong(value.locationIndex.getPageId());
2489            dataOut.writeLong(value.messageIdIndex.getPageId());
2490            if (value.subscriptions != null) {
2491                dataOut.writeBoolean(true);
2492                dataOut.writeLong(value.subscriptions.getPageId());
2493                dataOut.writeLong(value.subscriptionAcks.getPageId());
2494                dataOut.writeLong(value.ackPositions.getHeadPageId());
2495                dataOut.writeLong(value.subLocations.getHeadPageId());
2496            } else {
2497                dataOut.writeBoolean(false);
2498            }
2499            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2500            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2501        }
2502    }
2503
2504    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2505        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2506
2507        @Override
2508        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2509            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2510            rc.mergeFramed((InputStream)dataIn);
2511            return rc;
2512        }
2513
2514        @Override
2515        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2516            object.writeFramed((OutputStream)dataOut);
2517        }
2518    }
2519
2520    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2521        String key = key(destination);
2522        StoredDestination rc = storedDestinations.get(key);
2523        if (rc == null) {
2524            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2525            rc = loadStoredDestination(tx, key, topic);
2526            // Cache it. We may want to remove/unload destinations from the
2527            // cache that are not used for a while
2528            // to reduce memory usage.
2529            storedDestinations.put(key, rc);
2530        }
2531        return rc;
2532    }
2533
2534    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2535        String key = key(destination);
2536        StoredDestination rc = storedDestinations.get(key);
2537        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2538            rc = getStoredDestination(destination, tx);
2539        }
2540        return rc;
2541    }
2542
2543    /**
2544     * @param tx
2545     * @param key
2546     * @param topic
2547     * @return
2548     * @throws IOException
2549     */
2550    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2551        // Try to load the existing indexes..
2552        StoredDestination rc = metadata.destinations.get(tx, key);
2553        if (rc == null) {
2554            // Brand new destination.. allocate indexes for it.
2555            rc = new StoredDestination();
2556            rc.orderIndex.allocate(tx);
2557            rc.locationIndex = new BTreeIndex<>(pageFile, tx.allocate());
2558            rc.messageIdIndex = new BTreeIndex<>(pageFile, tx.allocate());
2559
2560            if (topic) {
2561                rc.subscriptions = new BTreeIndex<>(pageFile, tx.allocate());
2562                rc.subscriptionAcks = new BTreeIndex<>(pageFile, tx.allocate());
2563                rc.ackPositions = new ListIndex<>(pageFile, tx.allocate());
2564                rc.subLocations = new ListIndex<>(pageFile, tx.allocate());
2565            }
2566            metadata.destinations.put(tx, key, rc);
2567        }
2568
2569        // Configure the marshalers and load.
2570        rc.orderIndex.load(tx);
2571
2572        // Figure out the next key using the last entry in the destination.
2573        rc.orderIndex.configureLast(tx);
2574
2575        rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
2576        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2577        rc.locationIndex.load(tx);
2578
2579        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2580        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2581        rc.messageIdIndex.load(tx);
2582
2583        //go through an upgrade old index if older than version 6
2584        if (metadata.version < 6) {
2585            for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) {
2586                Entry<Location, Long> entry = iterator.next();
2587                // modify so it is upgraded
2588                rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
2589            }
2590            //upgrade the order index
2591            for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) {
2592                Entry<Long, MessageKeys> entry = iterator.next();
2593                //call get so that the last priority is updated
2594                rc.orderIndex.get(tx, entry.getKey());
2595                rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue());
2596            }
2597        }
2598
2599        // If it was a topic...
2600        if (topic) {
2601
2602            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2603            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2604            rc.subscriptions.load(tx);
2605
2606            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2607            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2608            rc.subscriptionAcks.load(tx);
2609
2610            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2611            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2612            rc.ackPositions.load(tx);
2613
2614            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2615            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2616            rc.subLocations.load(tx);
2617
2618            rc.subscriptionCursors = new HashMap<>();
2619
2620            if (metadata.version < 3) {
2621
2622                // on upgrade need to fill ackLocation with available messages past last ack
2623                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2624                    Entry<String, LastAck> entry = iterator.next();
2625                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2626                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2627                        Long sequence = orderIterator.next().getKey();
2628                        addAckLocation(tx, rc, sequence, entry.getKey());
2629                    }
2630                    // modify so it is upgraded
2631                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2632                }
2633            }
2634
2635            // Configure the message references index
2636
2637
2638            // Configure the subscription cache
2639            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2640                Entry<String, LastAck> entry = iterator.next();
2641                rc.subscriptionCache.add(entry.getKey());
2642            }
2643
2644            if (rc.orderIndex.nextMessageId == 0) {
2645                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2646                if (!rc.subscriptionAcks.isEmpty(tx)) {
2647                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2648                        Entry<String, LastAck> entry = iterator.next();
2649                        rc.orderIndex.nextMessageId =
2650                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2651                    }
2652                }
2653            } else {
2654                // update based on ackPositions for unmatched, last entry is always the next
2655                Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2656                while (subscriptions.hasNext()) {
2657                    Entry<String, SequenceSet> subscription = subscriptions.next();
2658                    SequenceSet pendingAcks = subscription.getValue();
2659                    if (pendingAcks != null && !pendingAcks.isEmpty()) {
2660                        for (Long sequenceId : pendingAcks) {
2661                            rc.orderIndex.nextMessageId = Math.max(rc.orderIndex.nextMessageId, sequenceId);
2662                        }
2663                    }
2664                }
2665            }
2666        }
2667
2668        if (metadata.version < VERSION) {
2669            // store again after upgrade
2670            metadata.destinations.put(tx, key, rc);
2671        }
2672        return rc;
2673    }
2674
2675    /**
2676     * Clear the counter for the destination, if one exists.
2677     *
2678     * @param kahaDestination
2679     */
2680    protected void clearStoreStats(KahaDestination kahaDestination) {
2681        String key = key(kahaDestination);
2682        MessageStoreStatistics storeStats = getStoreStats(key);
2683        MessageStoreSubscriptionStatistics subStats = getSubStats(key);
2684        if (storeStats != null) {
2685            storeStats.reset();
2686        }
2687        if (subStats != null) {
2688            subStats.reset();
2689        }
2690    }
2691
2692    /**
2693     * Update MessageStoreStatistics
2694     *
2695     * @param kahaDestination
2696     * @param size
2697     */
2698    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
2699        incrementAndAddSizeToStoreStat(key(kahaDestination), size);
2700    }
2701
2702    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
2703        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2704        if (storeStats != null) {
2705            storeStats.getMessageCount().increment();
2706            if (size > 0) {
2707                storeStats.getMessageSize().addSize(size);
2708            }
2709        }
2710    }
2711
2712    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
2713        decrementAndSubSizeToStoreStat(key(kahaDestination), size);
2714    }
2715
2716    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
2717        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2718        if (storeStats != null) {
2719            storeStats.getMessageCount().decrement();
2720            if (size > 0) {
2721                storeStats.getMessageSize().addSize(-size);
2722            }
2723        }
2724    }
2725
2726    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) {
2727        incrementAndAddSizeToStoreStat(key(kahaDestination), subKey, size);
2728    }
2729
2730    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, String subKey, long size) {
2731        if (enableSubscriptionStatistics) {
2732            MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey);
2733            if (subStats != null && subKey != null) {
2734                subStats.getMessageCount(subKey).increment();
2735                if (size > 0) {
2736                    subStats.getMessageSize(subKey).addSize(size);
2737                }
2738            }
2739        }
2740    }
2741
2742
2743    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, String subKey, long size) {
2744        if (enableSubscriptionStatistics) {
2745            MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey);
2746            if (subStats != null && subKey != null) {
2747                subStats.getMessageCount(subKey).decrement();
2748                if (size > 0) {
2749                    subStats.getMessageSize(subKey).addSize(-size);
2750                }
2751            }
2752        }
2753    }
2754
2755    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) {
2756        decrementAndSubSizeToStoreStat(key(kahaDestination), subKey, size);
2757    }
2758
2759    /**
2760     * This is a map to cache MessageStores for a specific
2761     * KahaDestination key
2762     */
2763    protected final ConcurrentMap<String, MessageStore> storeCache =
2764            new ConcurrentHashMap<>();
2765
2766    /**
2767     * Locate the storeMessageSize counter for this KahaDestination
2768     */
2769    protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
2770        MessageStoreStatistics storeStats = null;
2771        try {
2772            MessageStore messageStore = storeCache.get(kahaDestKey);
2773            if (messageStore != null) {
2774                storeStats = messageStore.getMessageStoreStatistics();
2775            }
2776        } catch (Exception e1) {
2777             LOG.error("Getting size counter of destination failed", e1);
2778        }
2779
2780        return storeStats;
2781    }
2782
2783    protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey) {
2784        MessageStoreSubscriptionStatistics subStats = null;
2785        try {
2786            MessageStore messageStore = storeCache.get(kahaDestKey);
2787            if (messageStore instanceof TopicMessageStore) {
2788                subStats = ((TopicMessageStore)messageStore).getMessageStoreSubStatistics();
2789            }
2790        } catch (Exception e1) {
2791             LOG.error("Getting size counter of destination failed", e1);
2792        }
2793
2794        return subStats;
2795    }
2796
2797    /**
2798     * Determine whether this Destination matches the DestinationType
2799     *
2800     * @param destination
2801     * @param type
2802     * @return
2803     */
2804    protected boolean matchType(Destination destination,
2805            KahaDestination.DestinationType type) {
2806        if (destination instanceof Topic
2807                && type.equals(KahaDestination.DestinationType.TOPIC)) {
2808            return true;
2809        } else if (destination instanceof Queue
2810                && type.equals(KahaDestination.DestinationType.QUEUE)) {
2811            return true;
2812        }
2813        return false;
2814    }
2815
2816    class LocationSizeMarshaller implements Marshaller<Location> {
2817
2818        public LocationSizeMarshaller() {
2819
2820        }
2821
2822        @Override
2823        public Location readPayload(DataInput dataIn) throws IOException {
2824            Location rc = new Location();
2825            rc.setDataFileId(dataIn.readInt());
2826            rc.setOffset(dataIn.readInt());
2827            if (metadata.version >= 6) {
2828                rc.setSize(dataIn.readInt());
2829            }
2830            return rc;
2831        }
2832
2833        @Override
2834        public void writePayload(Location object, DataOutput dataOut)
2835                throws IOException {
2836            dataOut.writeInt(object.getDataFileId());
2837            dataOut.writeInt(object.getOffset());
2838            dataOut.writeInt(object.getSize());
2839        }
2840
2841        @Override
2842        public int getFixedSize() {
2843            return 12;
2844        }
2845
2846        @Override
2847        public Location deepCopy(Location source) {
2848            return new Location(source);
2849        }
2850
2851        @Override
2852        public boolean isDeepCopySupported() {
2853            return true;
2854        }
2855    }
2856
2857    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2858        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2859        if (sequences == null) {
2860            sequences = new SequenceSet();
2861            sequences.add(messageSequence);
2862            sd.ackPositions.add(tx, subscriptionKey, sequences);
2863        } else {
2864            sequences.add(messageSequence);
2865            sd.ackPositions.put(tx, subscriptionKey, sequences);
2866        }
2867    }
2868
2869    // new sub is interested in potentially all existing messages
2870    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2871        SequenceSet allOutstanding = new SequenceSet();
2872        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2873        while (iterator.hasNext()) {
2874            SequenceSet set = iterator.next().getValue();
2875            for (Long entry : set) {
2876                allOutstanding.add(entry);
2877            }
2878        }
2879        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2880    }
2881
2882    // on a new message add, all existing subs are interested in this message
2883    private void addAckLocationForNewMessage(Transaction tx, KahaDestination kahaDest,
2884            StoredDestination sd, Long messageSequence) throws IOException {
2885        for(String subscriptionKey : sd.subscriptionCache) {
2886            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2887            if (sequences == null) {
2888                sequences = new SequenceSet();
2889                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2890                sd.ackPositions.add(tx, subscriptionKey, sequences);
2891            } else {
2892                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2893                sd.ackPositions.put(tx, subscriptionKey, sequences);
2894            }
2895
2896            MessageKeys key = sd.orderIndex.get(tx, messageSequence);
2897            incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey, key.location.getSize());
2898        }
2899    }
2900
2901    private void removeAckLocationsForSub(KahaSubscriptionCommand command,
2902            Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2903        if (!sd.ackPositions.isEmpty(tx)) {
2904            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2905            if (sequences == null || sequences.isEmpty()) {
2906                return;
2907            }
2908
2909            ArrayList<Long> unreferenced = new ArrayList<>();
2910
2911            for(Long sequenceId : sequences) {
2912                if(!isSequenceReferenced(tx, sd, sequenceId)) {
2913                    unreferenced.add(sequenceId);
2914                }
2915            }
2916
2917            for(Long sequenceId : unreferenced) {
2918                // Find all the entries that need to get deleted.
2919                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
2920                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2921
2922                // Do the actual deletes.
2923                for (Entry<Long, MessageKeys> entry : deletes) {
2924                    sd.locationIndex.remove(tx, entry.getValue().location);
2925                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2926                    sd.orderIndex.remove(tx, entry.getKey());
2927                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2928                }
2929            }
2930        }
2931    }
2932
2933    private boolean isSequenceReferenced(final Transaction tx, final StoredDestination sd, final Long sequenceId) throws IOException {
2934        for(String subscriptionKey : sd.subscriptionCache) {
2935            SequenceSet sequence = sd.ackPositions.get(tx, subscriptionKey);
2936            if (sequence != null && sequence.contains(sequenceId)) {
2937                return true;
2938            }
2939        }
2940        return false;
2941    }
2942
2943    /**
2944     * @param tx
2945     * @param sd
2946     * @param subscriptionKey
2947     * @param messageSequence
2948     * @throws IOException
2949     */
2950    private void removeAckLocation(KahaRemoveMessageCommand command,
2951            Transaction tx, StoredDestination sd, String subscriptionKey,
2952            Long messageSequence) throws IOException {
2953        // Remove the sub from the previous location set..
2954        if (messageSequence != null) {
2955            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2956            if (range != null && !range.isEmpty()) {
2957                range.remove(messageSequence);
2958                if (!range.isEmpty()) {
2959                    sd.ackPositions.put(tx, subscriptionKey, range);
2960                } else {
2961                    sd.ackPositions.remove(tx, subscriptionKey);
2962                }
2963
2964                MessageKeys key = sd.orderIndex.get(tx, messageSequence);
2965                decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey,
2966                        key.location.getSize());
2967
2968                // Check if the message is reference by any other subscription.
2969                if (isSequenceReferenced(tx, sd, messageSequence)) {
2970                    return;
2971                }
2972                // Find all the entries that need to get deleted.
2973                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
2974                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2975
2976                // Do the actual deletes.
2977                for (Entry<Long, MessageKeys> entry : deletes) {
2978                    sd.locationIndex.remove(tx, entry.getValue().location);
2979                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2980                    sd.orderIndex.remove(tx, entry.getKey());
2981                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2982                }
2983            }
2984        }
2985    }
2986
2987    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2988        return sd.subscriptionAcks.get(tx, subscriptionKey);
2989    }
2990
2991    protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2992        if (sd.ackPositions != null) {
2993            final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2994            return messageSequences;
2995        }
2996
2997        return null;
2998    }
2999
3000    protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
3001        if (sd.ackPositions != null) {
3002            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
3003            if (messageSequences != null) {
3004                long result = messageSequences.rangeSize();
3005                // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
3006                return result > 0 ? result - 1 : 0;
3007            }
3008        }
3009
3010        return 0;
3011    }
3012
3013    /**
3014     * Recovers durable subscription pending message size with only 1 pass over the order index on recovery
3015     * instead of iterating over the index once per subscription
3016     *
3017     * @param tx
3018     * @param sd
3019     * @param subscriptionKeys
3020     * @return
3021     * @throws IOException
3022     */
3023    protected Map<String, AtomicLong> getStoredMessageSize(Transaction tx, StoredDestination sd, List<String> subscriptionKeys) throws IOException {
3024
3025        final Map<String, AtomicLong> subPendingMessageSizes = new HashMap<>();
3026        final Map<String, SequenceSet> messageSequencesMap = new HashMap<>();
3027
3028        if (sd.ackPositions != null) {
3029            Long recoveryPosition = null;
3030            //Go through each subscription and find matching ackPositions and their first
3031            //position to find the initial recovery position which is the first message across all subs
3032            //that needs to still be acked
3033            for (String subscriptionKey : subscriptionKeys) {
3034                subPendingMessageSizes.put(subscriptionKey, new AtomicLong());
3035                final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
3036                if (messageSequences != null && !messageSequences.isEmpty()) {
3037                    final long head = messageSequences.getHead().getFirst();
3038                    recoveryPosition = recoveryPosition != null ? Math.min(recoveryPosition, head) : head;
3039                    //cache the SequenceSet to speed up recovery of metrics below and avoid a second index hit
3040                    messageSequencesMap.put(subscriptionKey, messageSequences);
3041                }
3042            }
3043            recoveryPosition = recoveryPosition != null ? recoveryPosition : 0;
3044
3045            final Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
3046                    new MessageOrderCursor(recoveryPosition));
3047
3048            //iterate through all messages starting at the recovery position to recover metrics
3049            while (iterator.hasNext()) {
3050                final Entry<Long, MessageKeys> messageEntry = iterator.next();
3051
3052                //For each message in the index check if each subscription needs to ack the message still
3053                //if the ackPositions SequenceSet contains the message then it has not been acked and should be
3054                //added to the pending metrics for that subscription
3055                for (Entry<String, SequenceSet> seqEntry : messageSequencesMap.entrySet()) {
3056                    final String subscriptionKey = seqEntry.getKey();
3057                    final SequenceSet messageSequences = messageSequencesMap.get(subscriptionKey);
3058                    if (messageSequences.contains(messageEntry.getKey())) {
3059                        subPendingMessageSizes.get(subscriptionKey).addAndGet(messageEntry.getValue().location.getSize());
3060                    }
3061                }
3062            }
3063        }
3064
3065        return subPendingMessageSizes;
3066    }
3067
3068    protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
3069        long locationSize = 0;
3070
3071        if (sd.ackPositions != null) {
3072            //grab the messages attached to this subscription
3073            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
3074
3075            if (messageSequences != null && !messageSequences.isEmpty()) {
3076                final Sequence head = messageSequences.getHead();
3077
3078                //get an iterator over the order index starting at the first unacked message
3079                //and go over each message to add up the size
3080                Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
3081                        new MessageOrderCursor(head.getFirst()));
3082
3083                final boolean contiguousRange = messageSequences.size() == 1;
3084                while (iterator.hasNext()) {
3085                    Entry<Long, MessageKeys> entry = iterator.next();
3086                    //Verify sequence contains the key
3087                    //if contiguous we just add all starting with the first but if not
3088                    //we need to check if the id is part of the range - could happen if individual ack mode was used
3089                    if (contiguousRange || messageSequences.contains(entry.getKey())) {
3090                        locationSize += entry.getValue().location.getSize();
3091                    }
3092                }
3093            }
3094        }
3095
3096        return locationSize;
3097    }
3098
3099    protected String key(KahaDestination destination) {
3100        return destination.getType().getNumber() + ":" + destination.getName();
3101    }
3102
3103    // /////////////////////////////////////////////////////////////////
3104    // Transaction related implementation methods.
3105    // /////////////////////////////////////////////////////////////////
3106    @SuppressWarnings("rawtypes")
3107    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<>();
3108    @SuppressWarnings("rawtypes")
3109    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<>();
3110    protected final Set<String> ackedAndPrepared = new HashSet<>();
3111    protected final Set<String> rolledBackAcks = new HashSet<>();
3112
3113    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
3114    // till then they are skipped by the store.
3115    // 'at most once' XA guarantee
3116    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
3117        this.indexLock.writeLock().lock();
3118        try {
3119            for (MessageAck ack : acks) {
3120                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
3121            }
3122        } finally {
3123            this.indexLock.writeLock().unlock();
3124        }
3125    }
3126
3127    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
3128        if (acks != null) {
3129            this.indexLock.writeLock().lock();
3130            try {
3131                for (MessageAck ack : acks) {
3132                    final String id = ack.getLastMessageId().toProducerKey();
3133                    ackedAndPrepared.remove(id);
3134                    if (rollback) {
3135                        rolledBackAcks.add(id);
3136                    }
3137                }
3138            } finally {
3139                this.indexLock.writeLock().unlock();
3140            }
3141        }
3142    }
3143
3144    @SuppressWarnings("rawtypes")
3145    private List<Operation> getInflightTx(KahaTransactionInfo info) {
3146        TransactionId key = TransactionIdConversion.convert(info);
3147        List<Operation> tx;
3148        synchronized (inflightTransactions) {
3149            tx = inflightTransactions.get(key);
3150            if (tx == null) {
3151                tx = Collections.synchronizedList(new ArrayList<Operation>());
3152                inflightTransactions.put(key, tx);
3153            }
3154        }
3155        return tx;
3156    }
3157
3158    @SuppressWarnings("unused")
3159    private TransactionId key(KahaTransactionInfo transactionInfo) {
3160        return TransactionIdConversion.convert(transactionInfo);
3161    }
3162
3163    abstract class Operation <T extends JournalCommand<T>> {
3164        final T command;
3165        final Location location;
3166
3167        public Operation(T command, Location location) {
3168            this.command = command;
3169            this.location = location;
3170        }
3171
3172        public Location getLocation() {
3173            return location;
3174        }
3175
3176        public T getCommand() {
3177            return command;
3178        }
3179
3180        abstract public void execute(Transaction tx) throws IOException;
3181    }
3182
3183    class AddOperation extends Operation<KahaAddMessageCommand> {
3184        final IndexAware runWithIndexLock;
3185        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
3186            super(command, location);
3187            this.runWithIndexLock = runWithIndexLock;
3188        }
3189
3190        @Override
3191        public void execute(Transaction tx) throws IOException {
3192            long seq = updateIndex(tx, command, location);
3193            if (runWithIndexLock != null) {
3194                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
3195            }
3196        }
3197    }
3198
3199    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
3200
3201        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
3202            super(command, location);
3203        }
3204
3205        @Override
3206        public void execute(Transaction tx) throws IOException {
3207            updateIndex(tx, command, location);
3208        }
3209    }
3210
3211    // /////////////////////////////////////////////////////////////////
3212    // Initialization related implementation methods.
3213    // /////////////////////////////////////////////////////////////////
3214
3215    private PageFile createPageFile() throws IOException {
3216        if (indexDirectory == null) {
3217            indexDirectory = directory;
3218        }
3219        IOHelper.mkdirs(indexDirectory);
3220        PageFile index = new PageFile(indexDirectory, "db");
3221        index.setEnableWriteThread(isEnableIndexWriteAsync());
3222        index.setWriteBatchSize(getIndexWriteBatchSize());
3223        index.setPageCacheSize(indexCacheSize);
3224        index.setUseLFRUEviction(isUseIndexLFRUEviction());
3225        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
3226        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
3227        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
3228        index.setEnablePageCaching(isEnableIndexPageCaching());
3229        return index;
3230    }
3231
3232    protected Journal createJournal() throws IOException {
3233        Journal manager = new Journal();
3234        manager.setDirectory(directory);
3235        manager.setMaxFileLength(getJournalMaxFileLength());
3236        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
3237        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
3238        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
3239        manager.setArchiveDataLogs(isArchiveDataLogs());
3240        manager.setSizeAccumulator(journalSize);
3241        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
3242        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
3243        manager.setPreallocationStrategy(
3244                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
3245        manager.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
3246        if (getDirectoryArchive() != null) {
3247            IOHelper.mkdirs(getDirectoryArchive());
3248            manager.setDirectoryArchive(getDirectoryArchive());
3249        }
3250        return manager;
3251    }
3252
3253    private Metadata createMetadata() {
3254        Metadata md = new Metadata();
3255        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
3256        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
3257        return md;
3258    }
3259
3260    protected abstract void configureMetadata();
3261
3262    public int getJournalMaxWriteBatchSize() {
3263        return journalMaxWriteBatchSize;
3264    }
3265
3266    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
3267        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
3268    }
3269
3270    public File getDirectory() {
3271        return directory;
3272    }
3273
3274    public void setDirectory(File directory) {
3275        this.directory = directory;
3276    }
3277
3278    public boolean isDeleteAllMessages() {
3279        return deleteAllMessages;
3280    }
3281
3282    public void setDeleteAllMessages(boolean deleteAllMessages) {
3283        this.deleteAllMessages = deleteAllMessages;
3284    }
3285
3286    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
3287        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
3288    }
3289
3290    public int getIndexWriteBatchSize() {
3291        return setIndexWriteBatchSize;
3292    }
3293
3294    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
3295        this.enableIndexWriteAsync = enableIndexWriteAsync;
3296    }
3297
3298    boolean isEnableIndexWriteAsync() {
3299        return enableIndexWriteAsync;
3300    }
3301
3302    /**
3303     * @deprecated use {@link #getJournalDiskSyncStrategyEnum} or {@link #getJournalDiskSyncStrategy} instead
3304     * @return
3305     */
3306    @Deprecated
3307    public boolean isEnableJournalDiskSyncs() {
3308        return journalDiskSyncStrategy == JournalDiskSyncStrategy.ALWAYS;
3309    }
3310
3311    /**
3312     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
3313     * @param syncWrites
3314     */
3315    @Deprecated
3316    public void setEnableJournalDiskSyncs(boolean syncWrites) {
3317        if (syncWrites) {
3318            journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
3319        } else {
3320            journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER;
3321        }
3322    }
3323
3324    public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() {
3325        return journalDiskSyncStrategy;
3326    }
3327
3328    public String getJournalDiskSyncStrategy() {
3329        return journalDiskSyncStrategy.name();
3330    }
3331
3332    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
3333        this.journalDiskSyncStrategy = JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase());
3334    }
3335
3336    public long getJournalDiskSyncInterval() {
3337        return journalDiskSyncInterval;
3338    }
3339
3340    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
3341        this.journalDiskSyncInterval = journalDiskSyncInterval;
3342    }
3343
3344    public long getCheckpointInterval() {
3345        return checkpointInterval;
3346    }
3347
3348    public void setCheckpointInterval(long checkpointInterval) {
3349        this.checkpointInterval = checkpointInterval;
3350    }
3351
3352    public long getCleanupInterval() {
3353        return cleanupInterval;
3354    }
3355
3356    public void setCleanupInterval(long cleanupInterval) {
3357        this.cleanupInterval = cleanupInterval;
3358    }
3359
3360    public void setJournalMaxFileLength(int journalMaxFileLength) {
3361        this.journalMaxFileLength = journalMaxFileLength;
3362    }
3363
3364    public int getJournalMaxFileLength() {
3365        return journalMaxFileLength;
3366    }
3367
3368    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
3369        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
3370    }
3371
3372    public int getMaxFailoverProducersToTrack() {
3373        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
3374    }
3375
3376    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
3377        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
3378    }
3379
3380    public int getFailoverProducersAuditDepth() {
3381        return this.metadata.producerSequenceIdTracker.getAuditDepth();
3382    }
3383
3384    public PageFile getPageFile() throws IOException {
3385        if (pageFile == null) {
3386            pageFile = createPageFile();
3387        }
3388        return pageFile;
3389    }
3390
3391    public Journal getJournal() throws IOException {
3392        if (journal == null) {
3393            journal = createJournal();
3394        }
3395        return journal;
3396    }
3397
3398    protected Metadata getMetadata() {
3399        return metadata;
3400    }
3401
3402    public boolean isFailIfDatabaseIsLocked() {
3403        return failIfDatabaseIsLocked;
3404    }
3405
3406    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
3407        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
3408    }
3409
3410    public boolean isIgnoreMissingJournalfiles() {
3411        return ignoreMissingJournalfiles;
3412    }
3413
3414    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
3415        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
3416    }
3417
3418    public int getIndexCacheSize() {
3419        return indexCacheSize;
3420    }
3421
3422    public void setIndexCacheSize(int indexCacheSize) {
3423        this.indexCacheSize = indexCacheSize;
3424    }
3425
3426    public boolean isCheckForCorruptJournalFiles() {
3427        return checkForCorruptJournalFiles;
3428    }
3429
3430    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
3431        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
3432    }
3433
3434    public PurgeRecoveredXATransactionStrategy getPurgeRecoveredXATransactionStrategyEnum() {
3435        return purgeRecoveredXATransactionStrategy;
3436    }
3437
3438    public String getPurgeRecoveredXATransactionStrategy() {
3439        return purgeRecoveredXATransactionStrategy.name();
3440    }
3441
3442    public void setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy) {
3443        this.purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.valueOf(
3444                purgeRecoveredXATransactionStrategy.trim().toUpperCase());
3445    }
3446
3447    public boolean isChecksumJournalFiles() {
3448        return checksumJournalFiles;
3449    }
3450
3451    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
3452        this.checksumJournalFiles = checksumJournalFiles;
3453    }
3454
3455    @Override
3456    public void setBrokerService(BrokerService brokerService) {
3457        this.brokerService = brokerService;
3458    }
3459
3460    /**
3461     * @return the archiveDataLogs
3462     */
3463    public boolean isArchiveDataLogs() {
3464        return this.archiveDataLogs;
3465    }
3466
3467    /**
3468     * @param archiveDataLogs the archiveDataLogs to set
3469     */
3470    public void setArchiveDataLogs(boolean archiveDataLogs) {
3471        this.archiveDataLogs = archiveDataLogs;
3472    }
3473
3474    /**
3475     * @return the directoryArchive
3476     */
3477    public File getDirectoryArchive() {
3478        return this.directoryArchive;
3479    }
3480
3481    /**
3482     * @param directoryArchive the directoryArchive to set
3483     */
3484    public void setDirectoryArchive(File directoryArchive) {
3485        this.directoryArchive = directoryArchive;
3486    }
3487
3488    public boolean isArchiveCorruptedIndex() {
3489        return archiveCorruptedIndex;
3490    }
3491
3492    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
3493        this.archiveCorruptedIndex = archiveCorruptedIndex;
3494    }
3495
3496    public float getIndexLFUEvictionFactor() {
3497        return indexLFUEvictionFactor;
3498    }
3499
3500    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
3501        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
3502    }
3503
3504    public boolean isUseIndexLFRUEviction() {
3505        return useIndexLFRUEviction;
3506    }
3507
3508    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
3509        this.useIndexLFRUEviction = useIndexLFRUEviction;
3510    }
3511
3512    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
3513        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
3514    }
3515
3516    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
3517        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
3518    }
3519
3520    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
3521        this.enableIndexPageCaching = enableIndexPageCaching;
3522    }
3523
3524    public boolean isEnableIndexDiskSyncs() {
3525        return enableIndexDiskSyncs;
3526    }
3527
3528    public boolean isEnableIndexRecoveryFile() {
3529        return enableIndexRecoveryFile;
3530    }
3531
3532    public boolean isEnableIndexPageCaching() {
3533        return enableIndexPageCaching;
3534    }
3535
3536    public PersistenceAdapterStatistics getPersistenceAdapterStatistics() {
3537        return this.persistenceAdapterStatistics;
3538    }
3539
3540    // /////////////////////////////////////////////////////////////////
3541    // Internal conversion methods.
3542    // /////////////////////////////////////////////////////////////////
3543
3544    class MessageOrderCursor{
3545        long defaultCursorPosition;
3546        long lowPriorityCursorPosition;
3547        long highPriorityCursorPosition;
3548        MessageOrderCursor(){
3549        }
3550
3551        MessageOrderCursor(long position){
3552            this.defaultCursorPosition=position;
3553            this.lowPriorityCursorPosition=position;
3554            this.highPriorityCursorPosition=position;
3555        }
3556
3557        MessageOrderCursor(MessageOrderCursor other){
3558            this.defaultCursorPosition=other.defaultCursorPosition;
3559            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3560            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3561        }
3562
3563        MessageOrderCursor copy() {
3564            return new MessageOrderCursor(this);
3565        }
3566
3567        void reset() {
3568            this.defaultCursorPosition=0;
3569            this.highPriorityCursorPosition=0;
3570            this.lowPriorityCursorPosition=0;
3571        }
3572
3573        void increment() {
3574            if (defaultCursorPosition!=0) {
3575                defaultCursorPosition++;
3576            }
3577            if (highPriorityCursorPosition!=0) {
3578                highPriorityCursorPosition++;
3579            }
3580            if (lowPriorityCursorPosition!=0) {
3581                lowPriorityCursorPosition++;
3582            }
3583        }
3584
3585        @Override
3586        public String toString() {
3587           return "MessageOrderCursor:[def:" + defaultCursorPosition
3588                   + ", low:" + lowPriorityCursorPosition
3589                   + ", high:" +  highPriorityCursorPosition + "]";
3590        }
3591
3592        public void sync(MessageOrderCursor other) {
3593            this.defaultCursorPosition=other.defaultCursorPosition;
3594            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3595            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3596        }
3597    }
3598
3599    class MessageOrderIndex {
3600        static final byte HI = 9;
3601        static final byte LO = 0;
3602        static final byte DEF = 4;
3603
3604        long nextMessageId;
3605        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3606        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3607        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3608        final MessageOrderCursor cursor = new MessageOrderCursor();
3609        Long lastDefaultKey;
3610        Long lastHighKey;
3611        Long lastLowKey;
3612        byte lastGetPriority;
3613        final List<Long> pendingAdditions = new LinkedList<>();
3614        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
3615
3616        MessageKeys remove(Transaction tx, Long key) throws IOException {
3617            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3618            if (result == null && highPriorityIndex!=null) {
3619                result = highPriorityIndex.remove(tx, key);
3620                if (result ==null && lowPriorityIndex!=null) {
3621                    result = lowPriorityIndex.remove(tx, key);
3622                }
3623            }
3624            return result;
3625        }
3626
3627        void load(Transaction tx) throws IOException {
3628            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3629            defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3630            defaultPriorityIndex.load(tx);
3631            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3632            lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3633            lowPriorityIndex.load(tx);
3634            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3635            highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3636            highPriorityIndex.load(tx);
3637        }
3638
3639        void allocate(Transaction tx) throws IOException {
3640            defaultPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
3641            if (metadata.version >= 2) {
3642                lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
3643                highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
3644            }
3645        }
3646
3647        void configureLast(Transaction tx) throws IOException {
3648            // Figure out the next key using the last entry in the destination.
3649            TreeSet<Long> orderedSet = new TreeSet<>();
3650
3651            addLast(orderedSet, highPriorityIndex, tx);
3652            addLast(orderedSet, defaultPriorityIndex, tx);
3653            addLast(orderedSet, lowPriorityIndex, tx);
3654
3655            if (!orderedSet.isEmpty()) {
3656                nextMessageId = orderedSet.last() + 1;
3657            }
3658        }
3659
3660        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3661            if (index != null) {
3662                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3663                if (lastEntry != null) {
3664                    orderedSet.add(lastEntry.getKey());
3665                }
3666            }
3667        }
3668
3669        void clear(Transaction tx) throws IOException {
3670            this.remove(tx);
3671            this.resetCursorPosition();
3672            this.allocate(tx);
3673            this.load(tx);
3674            this.configureLast(tx);
3675        }
3676
3677        void remove(Transaction tx) throws IOException {
3678            defaultPriorityIndex.clear(tx);
3679            defaultPriorityIndex.unload(tx);
3680            tx.free(defaultPriorityIndex.getPageId());
3681            if (lowPriorityIndex != null) {
3682                lowPriorityIndex.clear(tx);
3683                lowPriorityIndex.unload(tx);
3684
3685                tx.free(lowPriorityIndex.getPageId());
3686            }
3687            if (highPriorityIndex != null) {
3688                highPriorityIndex.clear(tx);
3689                highPriorityIndex.unload(tx);
3690                tx.free(highPriorityIndex.getPageId());
3691            }
3692        }
3693
3694        void resetCursorPosition() {
3695            this.cursor.reset();
3696            lastDefaultKey = null;
3697            lastHighKey = null;
3698            lastLowKey = null;
3699        }
3700
3701        void setBatch(Transaction tx, Long sequence) throws IOException {
3702            if (sequence != null) {
3703                Long nextPosition = new Long(sequence.longValue() + 1);
3704                lastDefaultKey = sequence;
3705                cursor.defaultCursorPosition = nextPosition.longValue();
3706                lastHighKey = sequence;
3707                cursor.highPriorityCursorPosition = nextPosition.longValue();
3708                lastLowKey = sequence;
3709                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3710            }
3711        }
3712
3713        void setBatch(Transaction tx, LastAck last) throws IOException {
3714            setBatch(tx, last.lastAckedSequence);
3715            if (cursor.defaultCursorPosition == 0
3716                    && cursor.highPriorityCursorPosition == 0
3717                    && cursor.lowPriorityCursorPosition == 0) {
3718                long next = last.lastAckedSequence + 1;
3719                switch (last.priority) {
3720                    case DEF:
3721                        cursor.defaultCursorPosition = next;
3722                        cursor.highPriorityCursorPosition = next;
3723                        break;
3724                    case HI:
3725                        cursor.highPriorityCursorPosition = next;
3726                        break;
3727                    case LO:
3728                        cursor.lowPriorityCursorPosition = next;
3729                        cursor.defaultCursorPosition = next;
3730                        cursor.highPriorityCursorPosition = next;
3731                        break;
3732                }
3733            }
3734        }
3735
3736        void stoppedIterating() {
3737            if (lastDefaultKey!=null) {
3738                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3739            }
3740            if (lastHighKey!=null) {
3741                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3742            }
3743            if (lastLowKey!=null) {
3744                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3745            }
3746            lastDefaultKey = null;
3747            lastHighKey = null;
3748            lastLowKey = null;
3749        }
3750
3751        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3752                throws IOException {
3753            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3754                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3755            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3756                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3757            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3758                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3759            }
3760        }
3761
3762        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3763                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3764
3765            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3766            deletes.add(iterator.next());
3767        }
3768
3769        long getNextMessageId() {
3770            return nextMessageId++;
3771        }
3772
3773        void revertNextMessageId() {
3774            nextMessageId--;
3775        }
3776
3777        MessageKeys get(Transaction tx, Long key) throws IOException {
3778            MessageKeys result = defaultPriorityIndex.get(tx, key);
3779            if (result == null) {
3780                result = highPriorityIndex.get(tx, key);
3781                if (result == null) {
3782                    result = lowPriorityIndex.get(tx, key);
3783                    lastGetPriority = LO;
3784                } else {
3785                    lastGetPriority = HI;
3786                }
3787            } else {
3788                lastGetPriority = DEF;
3789            }
3790            return result;
3791        }
3792
3793        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3794            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3795                return defaultPriorityIndex.put(tx, key, value);
3796            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3797                return highPriorityIndex.put(tx, key, value);
3798            } else {
3799                return lowPriorityIndex.put(tx, key, value);
3800            }
3801        }
3802
3803        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3804            return new MessageOrderIterator(tx,cursor,this);
3805        }
3806
3807        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3808            return new MessageOrderIterator(tx,m,this);
3809        }
3810
3811        public byte lastGetPriority() {
3812            return lastGetPriority;
3813        }
3814
3815        public boolean alreadyDispatched(Long sequence) {
3816            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3817                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3818                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3819        }
3820
3821        public void trackPendingAdd(Long seq) {
3822            synchronized (pendingAdditions) {
3823                pendingAdditions.add(seq);
3824            }
3825        }
3826
3827        public void trackPendingAddComplete(Long seq) {
3828            synchronized (pendingAdditions) {
3829                pendingAdditions.remove(seq);
3830            }
3831        }
3832
3833        public Long minPendingAdd() {
3834            synchronized (pendingAdditions) {
3835                if (!pendingAdditions.isEmpty()) {
3836                    return pendingAdditions.get(0);
3837                } else {
3838                    return null;
3839                }
3840            }
3841        }
3842
3843        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3844            Iterator<Entry<Long, MessageKeys>>currentIterator;
3845            final Iterator<Entry<Long, MessageKeys>>highIterator;
3846            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3847            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3848
3849            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3850                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3851                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3852                if (highPriorityIndex != null) {
3853                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3854                } else {
3855                    this.highIterator = null;
3856                }
3857                if (lowPriorityIndex != null) {
3858                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3859                } else {
3860                    this.lowIterator = null;
3861                }
3862            }
3863
3864            @Override
3865            public boolean hasNext() {
3866                if (currentIterator == null) {
3867                    if (highIterator != null) {
3868                        if (highIterator.hasNext()) {
3869                            currentIterator = highIterator;
3870                            return currentIterator.hasNext();
3871                        }
3872                        if (defaultIterator.hasNext()) {
3873                            currentIterator = defaultIterator;
3874                            return currentIterator.hasNext();
3875                        }
3876                        if (lowIterator.hasNext()) {
3877                            currentIterator = lowIterator;
3878                            return currentIterator.hasNext();
3879                        }
3880                        return false;
3881                    } else {
3882                        currentIterator = defaultIterator;
3883                        return currentIterator.hasNext();
3884                    }
3885                }
3886                if (highIterator != null) {
3887                    if (currentIterator.hasNext()) {
3888                        return true;
3889                    }
3890                    if (currentIterator == highIterator) {
3891                        if (defaultIterator.hasNext()) {
3892                            currentIterator = defaultIterator;
3893                            return currentIterator.hasNext();
3894                        }
3895                        if (lowIterator.hasNext()) {
3896                            currentIterator = lowIterator;
3897                            return currentIterator.hasNext();
3898                        }
3899                        return false;
3900                    }
3901
3902                    if (currentIterator == defaultIterator) {
3903                        if (lowIterator.hasNext()) {
3904                            currentIterator = lowIterator;
3905                            return currentIterator.hasNext();
3906                        }
3907                        return false;
3908                    }
3909                }
3910                return currentIterator.hasNext();
3911            }
3912
3913            @Override
3914            public Entry<Long, MessageKeys> next() {
3915                Entry<Long, MessageKeys> result = currentIterator.next();
3916                if (result != null) {
3917                    Long key = result.getKey();
3918                    if (highIterator != null) {
3919                        if (currentIterator == defaultIterator) {
3920                            lastDefaultKey = key;
3921                        } else if (currentIterator == highIterator) {
3922                            lastHighKey = key;
3923                        } else {
3924                            lastLowKey = key;
3925                        }
3926                    } else {
3927                        lastDefaultKey = key;
3928                    }
3929                }
3930                return result;
3931            }
3932
3933            @Override
3934            public void remove() {
3935                throw new UnsupportedOperationException();
3936            }
3937        }
3938    }
3939
3940    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3941        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3942
3943        @Override
3944        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3945            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3946            ObjectOutputStream oout = new ObjectOutputStream(baos);
3947            oout.writeObject(object);
3948            oout.flush();
3949            oout.close();
3950            byte[] data = baos.toByteArray();
3951            dataOut.writeInt(data.length);
3952            dataOut.write(data);
3953        }
3954
3955        @Override
3956        @SuppressWarnings("unchecked")
3957        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3958            int dataLen = dataIn.readInt();
3959            byte[] data = new byte[dataLen];
3960            dataIn.readFully(data);
3961            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3962            ObjectInputStream oin = new ObjectInputStream(bais);
3963            try {
3964                return (HashSet<String>) oin.readObject();
3965            } catch (ClassNotFoundException cfe) {
3966                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3967                ioe.initCause(cfe);
3968                throw ioe;
3969            }
3970        }
3971    }
3972
3973    public File getIndexDirectory() {
3974        return indexDirectory;
3975    }
3976
3977    public void setIndexDirectory(File indexDirectory) {
3978        this.indexDirectory = indexDirectory;
3979    }
3980
3981    interface IndexAware {
3982        public void sequenceAssignedWithIndexLocked(long index);
3983    }
3984
3985    public String getPreallocationScope() {
3986        return preallocationScope;
3987    }
3988
3989    public void setPreallocationScope(String preallocationScope) {
3990        this.preallocationScope = preallocationScope;
3991    }
3992
3993    public String getPreallocationStrategy() {
3994        return preallocationStrategy;
3995    }
3996
3997    public void setPreallocationStrategy(String preallocationStrategy) {
3998        this.preallocationStrategy = preallocationStrategy;
3999    }
4000
4001    public int getCompactAcksAfterNoGC() {
4002        return compactAcksAfterNoGC;
4003    }
4004
4005    /**
4006     * Sets the number of GC cycles where no journal logs were removed before an attempt to
4007     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
4008     * <p>
4009     * A value of -1 will disable this feature.
4010     *
4011     * @param compactAcksAfterNoGC
4012     *      Number of empty GC cycles before we rewrite old ACKS.
4013     */
4014    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
4015        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
4016    }
4017
4018    /**
4019     * Returns whether Ack compaction will ignore that the store is still growing
4020     * and run more often.
4021     *
4022     * @return the compactAcksIgnoresStoreGrowth current value.
4023     */
4024    public boolean isCompactAcksIgnoresStoreGrowth() {
4025        return compactAcksIgnoresStoreGrowth;
4026    }
4027
4028    /**
4029     * Configure if Ack compaction will occur regardless of continued growth of the
4030     * journal logs meaning that the store has not run out of space yet.  Because the
4031     * compaction operation can be costly this value is defaulted to off and the Ack
4032     * compaction is only done when it seems that the store cannot grow and larger.
4033     *
4034     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
4035     */
4036    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
4037        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
4038    }
4039
4040    /**
4041     * Returns whether Ack compaction is enabled
4042     *
4043     * @return enableAckCompaction
4044     */
4045    public boolean isEnableAckCompaction() {
4046        return enableAckCompaction;
4047    }
4048
4049    /**
4050     * Configure if the Ack compaction task should be enabled to run
4051     *
4052     * @param enableAckCompaction
4053     */
4054    public void setEnableAckCompaction(boolean enableAckCompaction) {
4055        this.enableAckCompaction = enableAckCompaction;
4056    }
4057
4058    /**
4059     * @return
4060     */
4061    public boolean isEnableSubscriptionStatistics() {
4062        return enableSubscriptionStatistics;
4063    }
4064
4065    /**
4066     * Enable caching statistics for each subscription to allow non-blocking
4067     * retrieval of metrics.  This could incur some overhead to compute if there are a lot
4068     * of subscriptions.
4069     *
4070     * @param enableSubscriptionStatistics
4071     */
4072    public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) {
4073        this.enableSubscriptionStatistics = enableSubscriptionStatistics;
4074    }
4075}