001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.EOFException;
026import java.io.File;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.InterruptedIOException;
030import java.io.ObjectInputStream;
031import java.io.ObjectOutputStream;
032import java.io.OutputStream;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Date;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.LinkedHashMap;
042import java.util.LinkedHashSet;
043import java.util.LinkedList;
044import java.util.List;
045import java.util.Map;
046import java.util.Map.Entry;
047import java.util.Set;
048import java.util.SortedSet;
049import java.util.TreeMap;
050import java.util.TreeSet;
051import java.util.concurrent.ConcurrentHashMap;
052import java.util.concurrent.ConcurrentMap;
053import java.util.concurrent.Executors;
054import java.util.concurrent.ScheduledExecutorService;
055import java.util.concurrent.ThreadFactory;
056import java.util.concurrent.TimeUnit;
057import java.util.concurrent.atomic.AtomicBoolean;
058import java.util.concurrent.atomic.AtomicLong;
059import java.util.concurrent.locks.ReentrantReadWriteLock;
060
061import org.apache.activemq.ActiveMQMessageAuditNoSync;
062import org.apache.activemq.broker.BrokerService;
063import org.apache.activemq.broker.BrokerServiceAware;
064import org.apache.activemq.broker.region.Destination;
065import org.apache.activemq.broker.region.Queue;
066import org.apache.activemq.broker.region.Topic;
067import org.apache.activemq.command.MessageAck;
068import org.apache.activemq.command.TransactionId;
069import org.apache.activemq.openwire.OpenWireFormat;
070import org.apache.activemq.protobuf.Buffer;
071import org.apache.activemq.store.MessageStore;
072import org.apache.activemq.store.MessageStoreStatistics;
073import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
074import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
075import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
076import org.apache.activemq.store.kahadb.data.KahaDestination;
077import org.apache.activemq.store.kahadb.data.KahaEntryType;
078import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
079import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
080import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
081import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
082import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
083import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
084import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
085import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
086import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
087import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
088import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
089import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
090import org.apache.activemq.store.kahadb.disk.index.ListIndex;
091import org.apache.activemq.store.kahadb.disk.journal.DataFile;
092import org.apache.activemq.store.kahadb.disk.journal.Journal;
093import org.apache.activemq.store.kahadb.disk.journal.Location;
094import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
095import org.apache.activemq.store.kahadb.disk.page.Page;
096import org.apache.activemq.store.kahadb.disk.page.PageFile;
097import org.apache.activemq.store.kahadb.disk.page.Transaction;
098import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
099import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
100import org.apache.activemq.store.kahadb.disk.util.Marshaller;
101import org.apache.activemq.store.kahadb.disk.util.Sequence;
102import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
103import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
104import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
105import org.apache.activemq.util.ByteSequence;
106import org.apache.activemq.util.DataByteArrayInputStream;
107import org.apache.activemq.util.DataByteArrayOutputStream;
108import org.apache.activemq.util.IOExceptionSupport;
109import org.apache.activemq.util.IOHelper;
110import org.apache.activemq.util.ServiceStopper;
111import org.apache.activemq.util.ServiceSupport;
112import org.apache.activemq.util.ThreadPoolUtils;
113import org.slf4j.Logger;
114import org.slf4j.LoggerFactory;
115
116public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
117
118    protected BrokerService brokerService;
119
120    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
121    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
122    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
123    protected static final Buffer UNMATCHED;
124    static {
125        UNMATCHED = new Buffer(new byte[]{});
126    }
127    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
128
129    static final int CLOSED_STATE = 1;
130    static final int OPEN_STATE = 2;
131    static final long NOT_ACKED = -1;
132
133    static final int VERSION = 6;
134
135    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
136
137    protected class Metadata {
138        protected Page<Metadata> page;
139        protected int state;
140        protected BTreeIndex<String, StoredDestination> destinations;
141        protected Location lastUpdate;
142        protected Location firstInProgressTransactionLocation;
143        protected Location producerSequenceIdTrackerLocation = null;
144        protected Location ackMessageFileMapLocation = null;
145        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
146        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
147        protected int version = VERSION;
148        protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
149
150        public void read(DataInput is) throws IOException {
151            state = is.readInt();
152            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
153            if (is.readBoolean()) {
154                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
155            } else {
156                lastUpdate = null;
157            }
158            if (is.readBoolean()) {
159                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
160            } else {
161                firstInProgressTransactionLocation = null;
162            }
163            try {
164                if (is.readBoolean()) {
165                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
166                } else {
167                    producerSequenceIdTrackerLocation = null;
168                }
169            } catch (EOFException expectedOnUpgrade) {
170            }
171            try {
172                version = is.readInt();
173            } catch (EOFException expectedOnUpgrade) {
174                version = 1;
175            }
176            if (version >= 5 && is.readBoolean()) {
177                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
178            } else {
179                ackMessageFileMapLocation = null;
180            }
181            try {
182                openwireVersion = is.readInt();
183            } catch (EOFException expectedOnUpgrade) {
184                openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
185            }
186            LOG.info("KahaDB is version " + version);
187        }
188
189        public void write(DataOutput os) throws IOException {
190            os.writeInt(state);
191            os.writeLong(destinations.getPageId());
192
193            if (lastUpdate != null) {
194                os.writeBoolean(true);
195                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
196            } else {
197                os.writeBoolean(false);
198            }
199
200            if (firstInProgressTransactionLocation != null) {
201                os.writeBoolean(true);
202                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
203            } else {
204                os.writeBoolean(false);
205            }
206
207            if (producerSequenceIdTrackerLocation != null) {
208                os.writeBoolean(true);
209                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
210            } else {
211                os.writeBoolean(false);
212            }
213            os.writeInt(VERSION);
214            if (ackMessageFileMapLocation != null) {
215                os.writeBoolean(true);
216                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
217            } else {
218                os.writeBoolean(false);
219            }
220            os.writeInt(this.openwireVersion);
221        }
222    }
223
224    class MetadataMarshaller extends VariableMarshaller<Metadata> {
225        @Override
226        public Metadata readPayload(DataInput dataIn) throws IOException {
227            Metadata rc = createMetadata();
228            rc.read(dataIn);
229            return rc;
230        }
231
232        @Override
233        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
234            object.write(dataOut);
235        }
236    }
237
238    protected PageFile pageFile;
239    protected Journal journal;
240    protected Metadata metadata = new Metadata();
241
242    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
243
244    protected boolean failIfDatabaseIsLocked;
245
246    protected boolean deleteAllMessages;
247    protected File directory = DEFAULT_DIRECTORY;
248    protected File indexDirectory = null;
249    protected ScheduledExecutorService scheduler;
250    private final Object schedulerLock = new Object();
251
252    protected boolean enableJournalDiskSyncs = true;
253    protected boolean archiveDataLogs;
254    protected File directoryArchive;
255    protected AtomicLong journalSize = new AtomicLong(0);
256    long checkpointInterval = 5*1000;
257    long cleanupInterval = 30*1000;
258    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
259    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
260    boolean enableIndexWriteAsync = false;
261    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
262    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
263    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
264
265    protected AtomicBoolean opened = new AtomicBoolean();
266    private boolean ignoreMissingJournalfiles = false;
267    private int indexCacheSize = 10000;
268    private boolean checkForCorruptJournalFiles = false;
269    private boolean checksumJournalFiles = true;
270    protected boolean forceRecoverIndex = false;
271    private boolean archiveCorruptedIndex = false;
272    private boolean useIndexLFRUEviction = false;
273    private float indexLFUEvictionFactor = 0.2f;
274    private boolean enableIndexDiskSyncs = true;
275    private boolean enableIndexRecoveryFile = true;
276    private boolean enableIndexPageCaching = true;
277    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
278
279    private boolean enableAckCompaction = false;
280    private int compactAcksAfterNoGC = 10;
281    private boolean compactAcksIgnoresStoreGrowth = false;
282    private int checkPointCyclesWithNoGC;
283    private int journalLogOnLastCompactionCheck;
284
285    @Override
286    public void doStart() throws Exception {
287        load();
288    }
289
290    @Override
291    public void doStop(ServiceStopper stopper) throws Exception {
292        unload();
293    }
294
295    private void loadPageFile() throws IOException {
296        this.indexLock.writeLock().lock();
297        try {
298            final PageFile pageFile = getPageFile();
299            pageFile.load();
300            pageFile.tx().execute(new Transaction.Closure<IOException>() {
301                @Override
302                public void execute(Transaction tx) throws IOException {
303                    if (pageFile.getPageCount() == 0) {
304                        // First time this is created.. Initialize the metadata
305                        Page<Metadata> page = tx.allocate();
306                        assert page.getPageId() == 0;
307                        page.set(metadata);
308                        metadata.page = page;
309                        metadata.state = CLOSED_STATE;
310                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
311
312                        tx.store(metadata.page, metadataMarshaller, true);
313                    } else {
314                        Page<Metadata> page = tx.load(0, metadataMarshaller);
315                        metadata = page.get();
316                        metadata.page = page;
317                    }
318                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
319                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
320                    metadata.destinations.load(tx);
321                }
322            });
323            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
324            // Perhaps we should just keep an index of file
325            storedDestinations.clear();
326            pageFile.tx().execute(new Transaction.Closure<IOException>() {
327                @Override
328                public void execute(Transaction tx) throws IOException {
329                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
330                        Entry<String, StoredDestination> entry = iterator.next();
331                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
332                        storedDestinations.put(entry.getKey(), sd);
333
334                        if (checkForCorruptJournalFiles) {
335                            // sanity check the index also
336                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
337                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
338                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
339                                }
340                            }
341                        }
342                    }
343                }
344            });
345            pageFile.flush();
346        } finally {
347            this.indexLock.writeLock().unlock();
348        }
349    }
350
351    private void startCheckpoint() {
352        if (checkpointInterval == 0 && cleanupInterval == 0) {
353            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
354            return;
355        }
356        synchronized (schedulerLock) {
357            if (scheduler == null || scheduler.isShutdown()) {
358                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
359
360                    @Override
361                    public Thread newThread(Runnable r) {
362                        Thread schedulerThread = new Thread(r);
363
364                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
365                        schedulerThread.setDaemon(true);
366
367                        return schedulerThread;
368                    }
369                });
370
371                // Short intervals for check-point and cleanups
372                long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
373
374                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
375            }
376        }
377    }
378
379    private final class CheckpointRunner implements Runnable {
380
381        private long lastCheckpoint = System.currentTimeMillis();
382        private long lastCleanup = System.currentTimeMillis();
383
384        @Override
385        public void run() {
386            try {
387                // Decide on cleanup vs full checkpoint here.
388                if (opened.get()) {
389                    long now = System.currentTimeMillis();
390                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
391                        checkpointCleanup(true);
392                        lastCleanup = now;
393                        lastCheckpoint = now;
394                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
395                        checkpointCleanup(false);
396                        lastCheckpoint = now;
397                    }
398                }
399            } catch (IOException ioe) {
400                LOG.error("Checkpoint failed", ioe);
401                brokerService.handleIOException(ioe);
402            } catch (Throwable e) {
403                LOG.error("Checkpoint failed", e);
404                brokerService.handleIOException(IOExceptionSupport.create(e));
405            }
406        }
407    }
408
409    public void open() throws IOException {
410        if( opened.compareAndSet(false, true) ) {
411            getJournal().start();
412            try {
413                loadPageFile();
414            } catch (Throwable t) {
415                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
416                if (LOG.isDebugEnabled()) {
417                    LOG.debug("Index load failure", t);
418                }
419                // try to recover index
420                try {
421                    pageFile.unload();
422                } catch (Exception ignore) {}
423                if (archiveCorruptedIndex) {
424                    pageFile.archive();
425                } else {
426                    pageFile.delete();
427                }
428                metadata = createMetadata();
429                //The metadata was recreated after a detect corruption so we need to
430                //reconfigure anything that was configured on the old metadata on startup
431                configureMetadata();
432                pageFile = null;
433                loadPageFile();
434            }
435            startCheckpoint();
436            recover();
437        }
438    }
439
440    public void load() throws IOException {
441        this.indexLock.writeLock().lock();
442        IOHelper.mkdirs(directory);
443        try {
444            if (deleteAllMessages) {
445                getJournal().start();
446                getJournal().delete();
447                getJournal().close();
448                journal = null;
449                getPageFile().delete();
450                LOG.info("Persistence store purged.");
451                deleteAllMessages = false;
452            }
453
454            open();
455            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
456        } finally {
457            this.indexLock.writeLock().unlock();
458        }
459    }
460
461    public void close() throws IOException, InterruptedException {
462        if( opened.compareAndSet(true, false)) {
463            checkpointLock.writeLock().lock();
464            try {
465                if (metadata.page != null) {
466                    checkpointUpdate(true);
467                }
468                pageFile.unload();
469                metadata = createMetadata();
470            } finally {
471                checkpointLock.writeLock().unlock();
472            }
473            journal.close();
474            synchronized(schedulerLock) {
475                if (scheduler != null) {
476                    ThreadPoolUtils.shutdownGraceful(scheduler, -1);
477                    scheduler = null;
478                }
479            }
480            // clear the cache and journalSize on shutdown of the store
481            storeCache.clear();
482            journalSize.set(0);
483        }
484    }
485
486    public void unload() throws IOException, InterruptedException {
487        this.indexLock.writeLock().lock();
488        try {
489            if( pageFile != null && pageFile.isLoaded() ) {
490                metadata.state = CLOSED_STATE;
491                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
492
493                if (metadata.page != null) {
494                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
495                        @Override
496                        public void execute(Transaction tx) throws IOException {
497                            tx.store(metadata.page, metadataMarshaller, true);
498                        }
499                    });
500                }
501            }
502        } finally {
503            this.indexLock.writeLock().unlock();
504        }
505        close();
506    }
507
508    // public for testing
509    @SuppressWarnings("rawtypes")
510    public Location[] getInProgressTxLocationRange() {
511        Location[] range = new Location[]{null, null};
512        synchronized (inflightTransactions) {
513            if (!inflightTransactions.isEmpty()) {
514                for (List<Operation> ops : inflightTransactions.values()) {
515                    if (!ops.isEmpty()) {
516                        trackMaxAndMin(range, ops);
517                    }
518                }
519            }
520            if (!preparedTransactions.isEmpty()) {
521                for (List<Operation> ops : preparedTransactions.values()) {
522                    if (!ops.isEmpty()) {
523                        trackMaxAndMin(range, ops);
524                    }
525                }
526            }
527        }
528        return range;
529    }
530
531    @SuppressWarnings("rawtypes")
532    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
533        Location t = ops.get(0).getLocation();
534        if (range[0] == null || t.compareTo(range[0]) <= 0) {
535            range[0] = t;
536        }
537        t = ops.get(ops.size() -1).getLocation();
538        if (range[1] == null || t.compareTo(range[1]) >= 0) {
539            range[1] = t;
540        }
541    }
542
543    class TranInfo {
544        TransactionId id;
545        Location location;
546
547        class opCount {
548            int add;
549            int remove;
550        }
551        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
552
553        @SuppressWarnings("rawtypes")
554        public void track(Operation operation) {
555            if (location == null ) {
556                location = operation.getLocation();
557            }
558            KahaDestination destination;
559            boolean isAdd = false;
560            if (operation instanceof AddOperation) {
561                AddOperation add = (AddOperation) operation;
562                destination = add.getCommand().getDestination();
563                isAdd = true;
564            } else {
565                RemoveOperation removeOpperation = (RemoveOperation) operation;
566                destination = removeOpperation.getCommand().getDestination();
567            }
568            opCount opCount = destinationOpCount.get(destination);
569            if (opCount == null) {
570                opCount = new opCount();
571                destinationOpCount.put(destination, opCount);
572            }
573            if (isAdd) {
574                opCount.add++;
575            } else {
576                opCount.remove++;
577            }
578        }
579
580        @Override
581        public String toString() {
582           StringBuffer buffer = new StringBuffer();
583           buffer.append(location).append(";").append(id).append(";\n");
584           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
585               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
586           }
587           return buffer.toString();
588        }
589    }
590
591    @SuppressWarnings("rawtypes")
592    public String getTransactions() {
593
594        ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
595        synchronized (inflightTransactions) {
596            if (!inflightTransactions.isEmpty()) {
597                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
598                    TranInfo info = new TranInfo();
599                    info.id = entry.getKey();
600                    for (Operation operation : entry.getValue()) {
601                        info.track(operation);
602                    }
603                    infos.add(info);
604                }
605            }
606        }
607        synchronized (preparedTransactions) {
608            if (!preparedTransactions.isEmpty()) {
609                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
610                    TranInfo info = new TranInfo();
611                    info.id = entry.getKey();
612                    for (Operation operation : entry.getValue()) {
613                        info.track(operation);
614                    }
615                    infos.add(info);
616                }
617            }
618        }
619        return infos.toString();
620    }
621
622    /**
623     * Move all the messages that were in the journal into long term storage. We
624     * just replay and do a checkpoint.
625     *
626     * @throws IOException
627     * @throws IOException
628     * @throws IllegalStateException
629     */
630    private void recover() throws IllegalStateException, IOException {
631        this.indexLock.writeLock().lock();
632        try {
633
634            long start = System.currentTimeMillis();
635            Location producerAuditPosition = recoverProducerAudit();
636            Location ackMessageFileLocation = recoverAckMessageFileMap();
637            Location lastIndoubtPosition = getRecoveryPosition();
638
639            Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
640            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
641
642            if (recoveryPosition != null) {
643                int redoCounter = 0;
644                LOG.info("Recovering from the journal @" + recoveryPosition);
645                while (recoveryPosition != null) {
646                    try {
647                        JournalCommand<?> message = load(recoveryPosition);
648                        metadata.lastUpdate = recoveryPosition;
649                        process(message, recoveryPosition, lastIndoubtPosition);
650                        redoCounter++;
651                    } catch (IOException failedRecovery) {
652                        if (isIgnoreMissingJournalfiles()) {
653                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
654                            // track this dud location
655                            journal.corruptRecoveryLocation(recoveryPosition);
656                        } else {
657                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
658                        }
659                    }
660                    recoveryPosition = journal.getNextLocation(recoveryPosition);
661                     if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
662                         LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries recovered ..");
663                     }
664                }
665                if (LOG.isInfoEnabled()) {
666                    long end = System.currentTimeMillis();
667                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
668                }
669            }
670
671            // We may have to undo some index updates.
672            pageFile.tx().execute(new Transaction.Closure<IOException>() {
673                @Override
674                public void execute(Transaction tx) throws IOException {
675                    recoverIndex(tx);
676                }
677            });
678
679            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
680            Set<TransactionId> toRollback = new HashSet<TransactionId>();
681            Set<TransactionId> toDiscard = new HashSet<TransactionId>();
682            synchronized (inflightTransactions) {
683                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
684                    TransactionId id = it.next();
685                    if (id.isLocalTransaction()) {
686                        toRollback.add(id);
687                    } else {
688                        toDiscard.add(id);
689                    }
690                }
691                for (TransactionId tx: toRollback) {
692                    if (LOG.isDebugEnabled()) {
693                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
694                    }
695                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
696                }
697                for (TransactionId tx: toDiscard) {
698                    if (LOG.isDebugEnabled()) {
699                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
700                    }
701                    inflightTransactions.remove(tx);
702                }
703            }
704
705            synchronized (preparedTransactions) {
706                for (TransactionId txId : preparedTransactions.keySet()) {
707                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
708                }
709            }
710
711        } finally {
712            this.indexLock.writeLock().unlock();
713        }
714    }
715
716    @SuppressWarnings("unused")
717    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
718        return TransactionIdConversion.convertToLocal(tx);
719    }
720
721    private Location minimum(Location producerAuditPosition,
722            Location lastIndoubtPosition) {
723        Location min = null;
724        if (producerAuditPosition != null) {
725            min = producerAuditPosition;
726            if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
727                min = lastIndoubtPosition;
728            }
729        } else {
730            min = lastIndoubtPosition;
731        }
732        return min;
733    }
734
735    private Location recoverProducerAudit() throws IOException {
736        if (metadata.producerSequenceIdTrackerLocation != null) {
737            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
738            try {
739                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
740                int maxNumProducers = getMaxFailoverProducersToTrack();
741                int maxAuditDepth = getFailoverProducersAuditDepth();
742                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
743                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
744                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
745                return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
746            } catch (Exception e) {
747                LOG.warn("Cannot recover message audit", e);
748                return journal.getNextLocation(null);
749            }
750        } else {
751            // got no audit stored so got to recreate via replay from start of the journal
752            return journal.getNextLocation(null);
753        }
754    }
755
756    @SuppressWarnings("unchecked")
757    private Location recoverAckMessageFileMap() throws IOException {
758        if (metadata.ackMessageFileMapLocation != null) {
759            KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
760            try {
761                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
762                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
763                return journal.getNextLocation(metadata.ackMessageFileMapLocation);
764            } catch (Exception e) {
765                LOG.warn("Cannot recover ackMessageFileMap", e);
766                return journal.getNextLocation(null);
767            }
768        } else {
769            // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
770            return journal.getNextLocation(null);
771        }
772    }
773
774    protected void recoverIndex(Transaction tx) throws IOException {
775        long start = System.currentTimeMillis();
776        // It is possible index updates got applied before the journal updates..
777        // in that case we need to removed references to messages that are not in the journal
778        final Location lastAppendLocation = journal.getLastAppendLocation();
779        long undoCounter=0;
780
781        // Go through all the destinations to see if they have messages past the lastAppendLocation
782        for (String key : storedDestinations.keySet()) {
783            StoredDestination sd = storedDestinations.get(key);
784
785            final ArrayList<Long> matches = new ArrayList<Long>();
786            // Find all the Locations that are >= than the last Append Location.
787            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
788                @Override
789                protected void matched(Location key, Long value) {
790                    matches.add(value);
791                }
792            });
793
794            for (Long sequenceId : matches) {
795                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
796                if (keys != null) {
797                    sd.locationIndex.remove(tx, keys.location);
798                    sd.messageIdIndex.remove(tx, keys.messageId);
799                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
800                    undoCounter++;
801                    decrementAndSubSizeToStoreStat(key, keys.location.getSize());
802                    // TODO: do we need to modify the ack positions for the pub sub case?
803                }
804            }
805        }
806
807        if (undoCounter > 0) {
808            // The rolledback operations are basically in flight journal writes.  To avoid getting
809            // these the end user should do sync writes to the journal.
810            if (LOG.isInfoEnabled()) {
811                long end = System.currentTimeMillis();
812                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
813            }
814        }
815
816        undoCounter = 0;
817        start = System.currentTimeMillis();
818
819        // Lets be extra paranoid here and verify that all the datafiles being referenced
820        // by the indexes still exists.
821
822        final SequenceSet ss = new SequenceSet();
823        for (StoredDestination sd : storedDestinations.values()) {
824            // Use a visitor to cut down the number of pages that we load
825            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
826                int last=-1;
827
828                @Override
829                public boolean isInterestedInKeysBetween(Location first, Location second) {
830                    if( first==null ) {
831                        return !ss.contains(0, second.getDataFileId());
832                    } else if( second==null ) {
833                        return true;
834                    } else {
835                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
836                    }
837                }
838
839                @Override
840                public void visit(List<Location> keys, List<Long> values) {
841                    for (Location l : keys) {
842                        int fileId = l.getDataFileId();
843                        if( last != fileId ) {
844                            ss.add(fileId);
845                            last = fileId;
846                        }
847                    }
848                }
849
850            });
851        }
852        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
853        while (!ss.isEmpty()) {
854            missingJournalFiles.add((int) ss.removeFirst());
855        }
856
857        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
858            missingJournalFiles.add(entry.getKey());
859            for (Integer i : entry.getValue()) {
860                missingJournalFiles.add(i);
861            }
862        }
863
864        missingJournalFiles.removeAll(journal.getFileMap().keySet());
865
866        if (!missingJournalFiles.isEmpty()) {
867            LOG.warn("Some journal files are missing: " + missingJournalFiles);
868        }
869
870        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
871        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
872        for (Integer missing : missingJournalFiles) {
873            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
874        }
875
876        if (checkForCorruptJournalFiles) {
877            Collection<DataFile> dataFiles = journal.getFileMap().values();
878            for (DataFile dataFile : dataFiles) {
879                int id = dataFile.getDataFileId();
880                // eof to next file id
881                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
882                Sequence seq = dataFile.getCorruptedBlocks().getHead();
883                while (seq != null) {
884                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
885                        new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
886                    missingPredicates.add(visitor);
887                    knownCorruption.add(visitor);
888                    seq = seq.getNext();
889                }
890            }
891        }
892
893        if (!missingPredicates.isEmpty()) {
894            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
895                final StoredDestination sd = sdEntry.getValue();
896                final ArrayList<Long> matches = new ArrayList<Long>();
897                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
898                    @Override
899                    protected void matched(Location key, Long value) {
900                        matches.add(value);
901                    }
902                });
903
904                // If some message references are affected by the missing data files...
905                if (!matches.isEmpty()) {
906
907                    // We either 'gracefully' recover dropping the missing messages or
908                    // we error out.
909                    if( ignoreMissingJournalfiles ) {
910                        // Update the index to remove the references to the missing data
911                        for (Long sequenceId : matches) {
912                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
913                            sd.locationIndex.remove(tx, keys.location);
914                            sd.messageIdIndex.remove(tx, keys.messageId);
915                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
916                            undoCounter++;
917                            decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
918                            // TODO: do we need to modify the ack positions for the pub sub case?
919                        }
920                    } else {
921                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations. " + matches.size() + " messages affected.");
922                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
923                    }
924                }
925            }
926        }
927
928        if (!ignoreMissingJournalfiles) {
929            if (!knownCorruption.isEmpty()) {
930                LOG.error("Detected corrupt journal files. " + knownCorruption);
931                throw new IOException("Detected corrupt journal files. " + knownCorruption);
932            }
933
934            if (!missingJournalFiles.isEmpty()) {
935                LOG.error("Detected missing journal files. " + missingJournalFiles);
936                throw new IOException("Detected missing journal files. " + missingJournalFiles);
937            }
938        }
939
940        if (undoCounter > 0) {
941            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
942            // should do sync writes to the journal.
943            if (LOG.isInfoEnabled()) {
944                long end = System.currentTimeMillis();
945                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
946            }
947        }
948    }
949
950    private Location nextRecoveryPosition;
951    private Location lastRecoveryPosition;
952
953    public void incrementalRecover() throws IOException {
954        this.indexLock.writeLock().lock();
955        try {
956            if( nextRecoveryPosition == null ) {
957                if( lastRecoveryPosition==null ) {
958                    nextRecoveryPosition = getRecoveryPosition();
959                } else {
960                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
961                }
962            }
963            while (nextRecoveryPosition != null) {
964                lastRecoveryPosition = nextRecoveryPosition;
965                metadata.lastUpdate = lastRecoveryPosition;
966                JournalCommand<?> message = load(lastRecoveryPosition);
967                process(message, lastRecoveryPosition, (IndexAware) null);
968                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
969            }
970        } finally {
971            this.indexLock.writeLock().unlock();
972        }
973    }
974
975    public Location getLastUpdatePosition() throws IOException {
976        return metadata.lastUpdate;
977    }
978
979    private Location getRecoveryPosition() throws IOException {
980
981        if (!this.forceRecoverIndex) {
982
983            // If we need to recover the transactions..
984            if (metadata.firstInProgressTransactionLocation != null) {
985                return metadata.firstInProgressTransactionLocation;
986            }
987
988            // Perhaps there were no transactions...
989            if( metadata.lastUpdate!=null) {
990                // Start replay at the record after the last one recorded in the index file.
991                return journal.getNextLocation(metadata.lastUpdate);
992            }
993        }
994        // This loads the first position.
995        return journal.getNextLocation(null);
996    }
997
998    protected void checkpointCleanup(final boolean cleanup) throws IOException {
999        long start;
1000        this.indexLock.writeLock().lock();
1001        try {
1002            start = System.currentTimeMillis();
1003            if( !opened.get() ) {
1004                return;
1005            }
1006        } finally {
1007            this.indexLock.writeLock().unlock();
1008        }
1009        checkpointUpdate(cleanup);
1010        long end = System.currentTimeMillis();
1011        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1012            if (LOG.isInfoEnabled()) {
1013                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
1014            }
1015        }
1016    }
1017
1018    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
1019        int size = data.serializedSizeFramed();
1020        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
1021        os.writeByte(data.type().getNumber());
1022        data.writeFramed(os);
1023        return os.toByteSequence();
1024    }
1025
1026    // /////////////////////////////////////////////////////////////////
1027    // Methods call by the broker to update and query the store.
1028    // /////////////////////////////////////////////////////////////////
1029    public Location store(JournalCommand<?> data) throws IOException {
1030        return store(data, false, null,null);
1031    }
1032
1033    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1034        return store(data, false, null, null, onJournalStoreComplete);
1035    }
1036
1037    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1038        return store(data, sync, before, after, null);
1039    }
1040
1041    /**
1042     * All updated are are funneled through this method. The updates are converted
1043     * to a JournalMessage which is logged to the journal and then the data from
1044     * the JournalMessage is used to update the index just like it would be done
1045     * during a recovery process.
1046     */
1047    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1048        try {
1049            ByteSequence sequence = toByteSequence(data);
1050            Location location;
1051
1052            checkpointLock.readLock().lock();
1053            try {
1054
1055                long start = System.currentTimeMillis();
1056                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
1057                long start2 = System.currentTimeMillis();
1058                process(data, location, before);
1059
1060                long end = System.currentTimeMillis();
1061                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1062                    if (LOG.isInfoEnabled()) {
1063                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1064                    }
1065                }
1066            } finally {
1067                checkpointLock.readLock().unlock();
1068            }
1069
1070            if (after != null) {
1071                after.run();
1072            }
1073
1074            if (scheduler == null && opened.get()) {
1075                startCheckpoint();
1076            }
1077            return location;
1078        } catch (IOException ioe) {
1079            LOG.error("KahaDB failed to store to Journal", ioe);
1080            brokerService.handleIOException(ioe);
1081            throw ioe;
1082        }
1083    }
1084
1085    /**
1086     * Loads a previously stored JournalMessage
1087     *
1088     * @param location
1089     * @return
1090     * @throws IOException
1091     */
1092    public JournalCommand<?> load(Location location) throws IOException {
1093        long start = System.currentTimeMillis();
1094        ByteSequence data = journal.read(location);
1095        long end = System.currentTimeMillis();
1096        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1097            if (LOG.isInfoEnabled()) {
1098                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1099            }
1100        }
1101        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1102        byte readByte = is.readByte();
1103        KahaEntryType type = KahaEntryType.valueOf(readByte);
1104        if( type == null ) {
1105            try {
1106                is.close();
1107            } catch (IOException e) {}
1108            throw new IOException("Could not load journal record. Invalid location: "+location);
1109        }
1110        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1111        message.mergeFramed(is);
1112        return message;
1113    }
1114
1115    /**
1116     * do minimal recovery till we reach the last inDoubtLocation
1117     * @param data
1118     * @param location
1119     * @param inDoubtlocation
1120     * @throws IOException
1121     */
1122    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1123        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1124            process(data, location, (IndexAware) null);
1125        } else {
1126            // just recover producer audit
1127            data.visit(new Visitor() {
1128                @Override
1129                public void visit(KahaAddMessageCommand command) throws IOException {
1130                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1131                }
1132            });
1133        }
1134    }
1135
1136    // /////////////////////////////////////////////////////////////////
1137    // Journaled record processing methods. Once the record is journaled,
1138    // these methods handle applying the index updates. These may be called
1139    // from the recovery method too so they need to be idempotent
1140    // /////////////////////////////////////////////////////////////////
1141
1142    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1143        data.visit(new Visitor() {
1144            @Override
1145            public void visit(KahaAddMessageCommand command) throws IOException {
1146                process(command, location, onSequenceAssignedCallback);
1147            }
1148
1149            @Override
1150            public void visit(KahaRemoveMessageCommand command) throws IOException {
1151                process(command, location);
1152            }
1153
1154            @Override
1155            public void visit(KahaPrepareCommand command) throws IOException {
1156                process(command, location);
1157            }
1158
1159            @Override
1160            public void visit(KahaCommitCommand command) throws IOException {
1161                process(command, location, onSequenceAssignedCallback);
1162            }
1163
1164            @Override
1165            public void visit(KahaRollbackCommand command) throws IOException {
1166                process(command, location);
1167            }
1168
1169            @Override
1170            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1171                process(command, location);
1172            }
1173
1174            @Override
1175            public void visit(KahaSubscriptionCommand command) throws IOException {
1176                process(command, location);
1177            }
1178
1179            @Override
1180            public void visit(KahaProducerAuditCommand command) throws IOException {
1181                processLocation(location);
1182            }
1183
1184            @Override
1185            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1186                processLocation(location);
1187            }
1188
1189            @Override
1190            public void visit(KahaTraceCommand command) {
1191                processLocation(location);
1192            }
1193
1194            @Override
1195            public void visit(KahaUpdateMessageCommand command) throws IOException {
1196                process(command, location);
1197            }
1198
1199            @Override
1200            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
1201                process(command, location);
1202            }
1203        });
1204    }
1205
1206    @SuppressWarnings("rawtypes")
1207    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1208        if (command.hasTransactionInfo()) {
1209            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1210            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1211        } else {
1212            this.indexLock.writeLock().lock();
1213            try {
1214                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1215                    @Override
1216                    public void execute(Transaction tx) throws IOException {
1217                        long assignedIndex = updateIndex(tx, command, location);
1218                        if (runWithIndexLock != null) {
1219                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1220                        }
1221                    }
1222                });
1223
1224            } finally {
1225                this.indexLock.writeLock().unlock();
1226            }
1227        }
1228    }
1229
1230    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1231        this.indexLock.writeLock().lock();
1232        try {
1233            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1234                @Override
1235                public void execute(Transaction tx) throws IOException {
1236                    updateIndex(tx, command, location);
1237                }
1238            });
1239        } finally {
1240            this.indexLock.writeLock().unlock();
1241        }
1242    }
1243
1244    @SuppressWarnings("rawtypes")
1245    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1246        if (command.hasTransactionInfo()) {
1247           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1248           inflightTx.add(new RemoveOperation(command, location));
1249        } else {
1250            this.indexLock.writeLock().lock();
1251            try {
1252                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1253                    @Override
1254                    public void execute(Transaction tx) throws IOException {
1255                        updateIndex(tx, command, location);
1256                    }
1257                });
1258            } finally {
1259                this.indexLock.writeLock().unlock();
1260            }
1261        }
1262    }
1263
1264    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1265        this.indexLock.writeLock().lock();
1266        try {
1267            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1268                @Override
1269                public void execute(Transaction tx) throws IOException {
1270                    updateIndex(tx, command, location);
1271                }
1272            });
1273        } finally {
1274            this.indexLock.writeLock().unlock();
1275        }
1276    }
1277
1278    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1279        this.indexLock.writeLock().lock();
1280        try {
1281            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1282                @Override
1283                public void execute(Transaction tx) throws IOException {
1284                    updateIndex(tx, command, location);
1285                }
1286            });
1287        } finally {
1288            this.indexLock.writeLock().unlock();
1289        }
1290    }
1291
1292    protected void processLocation(final Location location) {
1293        this.indexLock.writeLock().lock();
1294        try {
1295            metadata.lastUpdate = location;
1296        } finally {
1297            this.indexLock.writeLock().unlock();
1298        }
1299    }
1300
1301    @SuppressWarnings("rawtypes")
1302    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1303        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1304        List<Operation> inflightTx;
1305        synchronized (inflightTransactions) {
1306            inflightTx = inflightTransactions.remove(key);
1307            if (inflightTx == null) {
1308                inflightTx = preparedTransactions.remove(key);
1309            }
1310        }
1311        if (inflightTx == null) {
1312            // only non persistent messages in this tx
1313            if (before != null) {
1314                before.sequenceAssignedWithIndexLocked(-1);
1315            }
1316            return;
1317        }
1318
1319        final List<Operation> messagingTx = inflightTx;
1320        indexLock.writeLock().lock();
1321        try {
1322            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1323                @Override
1324                public void execute(Transaction tx) throws IOException {
1325                    for (Operation op : messagingTx) {
1326                        op.execute(tx);
1327                    }
1328                }
1329            });
1330            metadata.lastUpdate = location;
1331        } finally {
1332            indexLock.writeLock().unlock();
1333        }
1334    }
1335
1336    @SuppressWarnings("rawtypes")
1337    protected void process(KahaPrepareCommand command, Location location) {
1338        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1339        synchronized (inflightTransactions) {
1340            List<Operation> tx = inflightTransactions.remove(key);
1341            if (tx != null) {
1342                preparedTransactions.put(key, tx);
1343            }
1344        }
1345    }
1346
1347    @SuppressWarnings("rawtypes")
1348    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1349        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1350        List<Operation> updates = null;
1351        synchronized (inflightTransactions) {
1352            updates = inflightTransactions.remove(key);
1353            if (updates == null) {
1354                updates = preparedTransactions.remove(key);
1355            }
1356        }
1357    }
1358
1359    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
1360        final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1361        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
1362            // Mark the current journal file as a compacted file so that gc checks can skip
1363            // over logs that are smaller compaction type logs.
1364            DataFile current = journal.getDataFileById(location.getDataFileId());
1365            current.setTypeCode(command.getRewriteType());
1366
1367            // Move offset so that next location read jumps to next file.
1368            location.setOffset(journalMaxFileLength);
1369        }
1370    }
1371
1372    // /////////////////////////////////////////////////////////////////
1373    // These methods do the actual index updates.
1374    // /////////////////////////////////////////////////////////////////
1375
1376    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1377    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1378
1379    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1380        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1381
1382        // Skip adding the message to the index if this is a topic and there are
1383        // no subscriptions.
1384        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1385            return -1;
1386        }
1387
1388        // Add the message.
1389        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1390        long id = sd.orderIndex.getNextMessageId();
1391        Long previous = sd.locationIndex.put(tx, location, id);
1392        if (previous == null) {
1393            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1394            if (previous == null) {
1395                incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1396                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1397                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1398                    addAckLocationForNewMessage(tx, sd, id);
1399                }
1400                metadata.lastUpdate = location;
1401            } else {
1402
1403                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1404                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1405                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1406                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1407                }
1408                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1409                sd.locationIndex.remove(tx, location);
1410                id = -1;
1411            }
1412        } else {
1413            // restore the previous value.. Looks like this was a redo of a previously
1414            // added message. We don't want to assign it a new id as the other indexes would
1415            // be wrong..
1416            sd.locationIndex.put(tx, location, previous);
1417            // ensure sequence is not broken
1418            sd.orderIndex.revertNextMessageId();
1419            metadata.lastUpdate = location;
1420        }
1421        // record this id in any event, initial send or recovery
1422        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1423
1424       return id;
1425    }
1426
1427    void trackPendingAdd(KahaDestination destination, Long seq) {
1428        StoredDestination sd = storedDestinations.get(key(destination));
1429        if (sd != null) {
1430            sd.trackPendingAdd(seq);
1431        }
1432    }
1433
1434    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1435        StoredDestination sd = storedDestinations.get(key(destination));
1436        if (sd != null) {
1437            sd.trackPendingAddComplete(seq);
1438        }
1439    }
1440
1441    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1442        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1443        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1444
1445        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1446        if (id != null) {
1447            MessageKeys previousKeys = sd.orderIndex.put(
1448                    tx,
1449                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1450                    id,
1451                    new MessageKeys(command.getMessageId(), location)
1452            );
1453            sd.locationIndex.put(tx, location, id);
1454            incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1455
1456            if (previousKeys != null) {
1457                //Remove the existing from the size
1458                decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
1459
1460                // on first update previous is original location, on recovery/replay it may be the updated location
1461                if(!previousKeys.location.equals(location)) {
1462                    sd.locationIndex.remove(tx, previousKeys.location);
1463                }
1464            }
1465            metadata.lastUpdate = location;
1466        } else {
1467            //Add the message if it can't be found
1468            this.updateIndex(tx, command, location);
1469        }
1470    }
1471
1472    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1473        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1474        if (!command.hasSubscriptionKey()) {
1475
1476            // In the queue case we just remove the message from the index..
1477            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1478            if (sequenceId != null) {
1479                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1480                if (keys != null) {
1481                    sd.locationIndex.remove(tx, keys.location);
1482                    decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
1483                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1484                    metadata.lastUpdate = ackLocation;
1485                }  else if (LOG.isDebugEnabled()) {
1486                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1487                }
1488            } else if (LOG.isDebugEnabled()) {
1489                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1490            }
1491        } else {
1492            // In the topic case we need remove the message once it's been acked
1493            // by all the subs
1494            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1495
1496            // Make sure it's a valid message id...
1497            if (sequence != null) {
1498                String subscriptionKey = command.getSubscriptionKey();
1499                if (command.getAck() != UNMATCHED) {
1500                    sd.orderIndex.get(tx, sequence);
1501                    byte priority = sd.orderIndex.lastGetPriority();
1502                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1503                }
1504
1505                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1506                if (keys != null) {
1507                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1508                }
1509                // The following method handles deleting un-referenced messages.
1510                removeAckLocation(command, tx, sd, subscriptionKey, sequence);
1511                metadata.lastUpdate = ackLocation;
1512            } else if (LOG.isDebugEnabled()) {
1513                LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1514            }
1515
1516        }
1517    }
1518
1519    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1520        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1521        if (referenceFileIds == null) {
1522            referenceFileIds = new HashSet<Integer>();
1523            referenceFileIds.add(messageLocation.getDataFileId());
1524            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1525        } else {
1526            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1527            if (!referenceFileIds.contains(id)) {
1528                referenceFileIds.add(id);
1529            }
1530        }
1531    }
1532
1533    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1534        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1535        sd.orderIndex.remove(tx);
1536
1537        sd.locationIndex.clear(tx);
1538        sd.locationIndex.unload(tx);
1539        tx.free(sd.locationIndex.getPageId());
1540
1541        sd.messageIdIndex.clear(tx);
1542        sd.messageIdIndex.unload(tx);
1543        tx.free(sd.messageIdIndex.getPageId());
1544
1545        if (sd.subscriptions != null) {
1546            sd.subscriptions.clear(tx);
1547            sd.subscriptions.unload(tx);
1548            tx.free(sd.subscriptions.getPageId());
1549
1550            sd.subscriptionAcks.clear(tx);
1551            sd.subscriptionAcks.unload(tx);
1552            tx.free(sd.subscriptionAcks.getPageId());
1553
1554            sd.ackPositions.clear(tx);
1555            sd.ackPositions.unload(tx);
1556            tx.free(sd.ackPositions.getHeadPageId());
1557
1558            sd.subLocations.clear(tx);
1559            sd.subLocations.unload(tx);
1560            tx.free(sd.subLocations.getHeadPageId());
1561        }
1562
1563        String key = key(command.getDestination());
1564        storedDestinations.remove(key);
1565        metadata.destinations.remove(tx, key);
1566        clearStoreStats(command.getDestination());
1567        storeCache.remove(key(command.getDestination()));
1568    }
1569
1570    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1571        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1572        final String subscriptionKey = command.getSubscriptionKey();
1573
1574        // If set then we are creating it.. otherwise we are destroying the sub
1575        if (command.hasSubscriptionInfo()) {
1576            Location existing = sd.subLocations.get(tx, subscriptionKey);
1577            if (existing != null && existing.compareTo(location) == 0) {
1578                // replay on recovery, ignore
1579                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1580                return;
1581            }
1582
1583            sd.subscriptions.put(tx, subscriptionKey, command);
1584            sd.subLocations.put(tx, subscriptionKey, location);
1585            long ackLocation=NOT_ACKED;
1586            if (!command.getRetroactive()) {
1587                ackLocation = sd.orderIndex.nextMessageId-1;
1588            } else {
1589                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1590            }
1591            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1592            sd.subscriptionCache.add(subscriptionKey);
1593        } else {
1594            // delete the sub...
1595            sd.subscriptions.remove(tx, subscriptionKey);
1596            sd.subLocations.remove(tx, subscriptionKey);
1597            sd.subscriptionAcks.remove(tx, subscriptionKey);
1598            sd.subscriptionCache.remove(subscriptionKey);
1599            removeAckLocationsForSub(command, tx, sd, subscriptionKey);
1600
1601            if (sd.subscriptions.isEmpty(tx)) {
1602                // remove the stored destination
1603                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1604                removeDestinationCommand.setDestination(command.getDestination());
1605                updateIndex(tx, removeDestinationCommand, null);
1606                clearStoreStats(command.getDestination());
1607            }
1608        }
1609    }
1610
1611    private void checkpointUpdate(final boolean cleanup) throws IOException {
1612        checkpointLock.writeLock().lock();
1613        try {
1614            this.indexLock.writeLock().lock();
1615            try {
1616                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1617                    @Override
1618                    public void execute(Transaction tx) throws IOException {
1619                        checkpointUpdate(tx, cleanup);
1620                    }
1621                });
1622            } finally {
1623                this.indexLock.writeLock().unlock();
1624            }
1625
1626        } finally {
1627            checkpointLock.writeLock().unlock();
1628        }
1629    }
1630
1631    /**
1632     * @param tx
1633     * @throws IOException
1634     */
1635    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1636        LOG.debug("Checkpoint started.");
1637
1638        // reflect last update exclusive of current checkpoint
1639        Location lastUpdate = metadata.lastUpdate;
1640
1641        metadata.state = OPEN_STATE;
1642        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1643        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1644        Location[] inProgressTxRange = getInProgressTxLocationRange();
1645        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1646        tx.store(metadata.page, metadataMarshaller, true);
1647        pageFile.flush();
1648
1649        if (cleanup) {
1650
1651            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1652            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1653
1654            if (LOG.isTraceEnabled()) {
1655                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1656            }
1657
1658            if (lastUpdate != null) {
1659                gcCandidateSet.remove(lastUpdate.getDataFileId());
1660            }
1661
1662            // Don't GC files under replication
1663            if( journalFilesBeingReplicated!=null ) {
1664                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1665            }
1666
1667            if (metadata.producerSequenceIdTrackerLocation != null) {
1668                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1669                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1670                    // rewrite so we don't prevent gc
1671                    metadata.producerSequenceIdTracker.setModified(true);
1672                    if (LOG.isTraceEnabled()) {
1673                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1674                    }
1675                }
1676                gcCandidateSet.remove(dataFileId);
1677                if (LOG.isTraceEnabled()) {
1678                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet);
1679                }
1680            }
1681
1682            if (metadata.ackMessageFileMapLocation != null) {
1683                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1684                gcCandidateSet.remove(dataFileId);
1685                if (LOG.isTraceEnabled()) {
1686                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet);
1687                }
1688            }
1689
1690            // Don't GC files referenced by in-progress tx
1691            if (inProgressTxRange[0] != null) {
1692                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1693                    gcCandidateSet.remove(pendingTx);
1694                }
1695            }
1696            if (LOG.isTraceEnabled()) {
1697                LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1698            }
1699
1700            // Go through all the destinations to see if any of them can remove GC candidates.
1701            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1702                if( gcCandidateSet.isEmpty() ) {
1703                    break;
1704                }
1705
1706                // Use a visitor to cut down the number of pages that we load
1707                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1708                    int last=-1;
1709                    @Override
1710                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1711                        if( first==null ) {
1712                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1713                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1714                                subset.remove(second.getDataFileId());
1715                            }
1716                            return !subset.isEmpty();
1717                        } else if( second==null ) {
1718                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1719                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1720                                subset.remove(first.getDataFileId());
1721                            }
1722                            return !subset.isEmpty();
1723                        } else {
1724                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1725                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1726                                subset.remove(first.getDataFileId());
1727                            }
1728                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1729                                subset.remove(second.getDataFileId());
1730                            }
1731                            return !subset.isEmpty();
1732                        }
1733                    }
1734
1735                    @Override
1736                    public void visit(List<Location> keys, List<Long> values) {
1737                        for (Location l : keys) {
1738                            int fileId = l.getDataFileId();
1739                            if( last != fileId ) {
1740                                gcCandidateSet.remove(fileId);
1741                                last = fileId;
1742                            }
1743                        }
1744                    }
1745                });
1746
1747                // Durable Subscription
1748                if (entry.getValue().subLocations != null) {
1749                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1750                    while (iter.hasNext()) {
1751                        Entry<String, Location> subscription = iter.next();
1752                        int dataFileId = subscription.getValue().getDataFileId();
1753
1754                        // Move subscription along if it has no outstanding messages that need ack'd
1755                        // and its in the last log file in the journal.
1756                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1757                            final StoredDestination destination = entry.getValue();
1758                            final String subscriptionKey = subscription.getKey();
1759                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1760
1761                            // When pending is size one that is the next message Id meaning there
1762                            // are no pending messages currently.
1763                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1764                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1765
1766                                if (LOG.isTraceEnabled()) {
1767                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
1768                                }
1769
1770                                final KahaSubscriptionCommand kahaSub =
1771                                    destination.subscriptions.get(tx, subscriptionKey);
1772                                destination.subLocations.put(
1773                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1774
1775                                // Skips the remove from candidates if we rewrote the subscription
1776                                // in order to prevent duplicate subscription commands on recover.
1777                                // If another subscription is on the same file and isn't rewritten
1778                                // than it will remove the file from the set.
1779                                continue;
1780                            }
1781                        }
1782
1783                        gcCandidateSet.remove(dataFileId);
1784                    }
1785                }
1786
1787                if (LOG.isTraceEnabled()) {
1788                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1789                }
1790            }
1791
1792            // check we are not deleting file with ack for in-use journal files
1793            if (LOG.isTraceEnabled()) {
1794                LOG.trace("gc candidates: " + gcCandidateSet);
1795                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1796            }
1797
1798            boolean ackMessageFileMapMod = false;
1799            Iterator<Integer> candidates = gcCandidateSet.iterator();
1800            while (candidates.hasNext()) {
1801                Integer candidate = candidates.next();
1802                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1803                if (referencedFileIds != null) {
1804                    for (Integer referencedFileId : referencedFileIds) {
1805                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1806                            // active file that is not targeted for deletion is referenced so don't delete
1807                            candidates.remove();
1808                            break;
1809                        }
1810                    }
1811                    if (gcCandidateSet.contains(candidate)) {
1812                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1813                    } else {
1814                        if (LOG.isTraceEnabled()) {
1815                            LOG.trace("not removing data file: " + candidate
1816                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1817                        }
1818                    }
1819                }
1820            }
1821
1822            if (!gcCandidateSet.isEmpty()) {
1823                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
1824                journal.removeDataFiles(gcCandidateSet);
1825                for (Integer candidate : gcCandidateSet) {
1826                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1827                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1828                    }
1829                }
1830                if (ackMessageFileMapMod) {
1831                    checkpointUpdate(tx, false);
1832                }
1833            } else if (isEnableAckCompaction()) {
1834                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
1835                    // First check length of journal to make sure it makes sense to even try.
1836                    //
1837                    // If there is only one journal file with Acks in it we don't need to move
1838                    // it since it won't be chained to any later logs.
1839                    //
1840                    // If the logs haven't grown since the last time then we need to compact
1841                    // otherwise there seems to still be room for growth and we don't need to incur
1842                    // the overhead.  Depending on configuration this check can be avoided and
1843                    // Ack compaction will run any time the store has not GC'd a journal file in
1844                    // the configured amount of cycles.
1845                    if (metadata.ackMessageFileMap.size() > 1 &&
1846                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
1847
1848                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
1849                        try {
1850                            scheduler.execute(new AckCompactionRunner());
1851                        } catch (Exception ex) {
1852                            LOG.warn("Error on queueing the Ack Compactor", ex);
1853                        }
1854                    } else {
1855                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
1856                    }
1857
1858                    checkPointCyclesWithNoGC = 0;
1859                } else {
1860                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
1861                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
1862                }
1863
1864                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
1865            }
1866        }
1867
1868        LOG.debug("Checkpoint done.");
1869    }
1870
1871    private final class AckCompactionRunner implements Runnable {
1872
1873        @Override
1874        public void run() {
1875
1876            int journalToAdvance = -1;
1877            Set<Integer> journalLogsReferenced = new HashSet<Integer>();
1878
1879            // Lock index to capture the ackMessageFileMap data
1880            indexLock.writeLock().lock();
1881
1882            try {
1883                // Map keys might not be sorted, find the earliest log file to forward acks
1884                // from and move only those, future cycles can chip away at more as needed.
1885                // We won't move files that are themselves rewritten on a previous compaction.
1886                List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
1887                Collections.sort(journalFileIds);
1888                for (Integer journalFileId : journalFileIds) {
1889                    DataFile current = journal.getDataFileById(journalFileId);
1890                    if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
1891                        journalToAdvance = journalFileId;
1892                        break;
1893                    }
1894                }
1895
1896                // Check if we found one, or if we only found the current file being written to.
1897                if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
1898                    return;
1899                }
1900
1901                journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
1902
1903            } finally {
1904                indexLock.writeLock().unlock();
1905            }
1906
1907            try {
1908                // Background rewrite of the old acks
1909                forwardAllAcks(journalToAdvance, journalLogsReferenced);
1910
1911                // Checkpoint with changes from the ackMessageFileMap
1912                checkpointUpdate(false);
1913            } catch (IOException ioe) {
1914                LOG.error("Checkpoint failed", ioe);
1915                brokerService.handleIOException(ioe);
1916            } catch (Throwable e) {
1917                LOG.error("Checkpoint failed", e);
1918                brokerService.handleIOException(IOExceptionSupport.create(e));
1919            }
1920        }
1921    }
1922
1923    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
1924        LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
1925
1926        DataFile forwardsFile = journal.reserveDataFile();
1927        LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
1928
1929        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
1930
1931        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
1932            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
1933            compactionMarker.setSourceDataFileId(journalToRead);
1934            compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE);
1935
1936            ByteSequence payload = toByteSequence(compactionMarker);
1937            appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
1938            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
1939
1940            Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0));
1941            while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) {
1942                JournalCommand<?> command = null;
1943                try {
1944                    command = load(nextLocation);
1945                } catch (IOException ex) {
1946                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
1947                }
1948
1949                if (command != null && command instanceof KahaRemoveMessageCommand) {
1950                    payload = toByteSequence(command);
1951                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
1952                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
1953                }
1954
1955                nextLocation = journal.getNextLocation(nextLocation);
1956            }
1957        }
1958
1959        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
1960
1961        // Lock index while we update the ackMessageFileMap.
1962        indexLock.writeLock().lock();
1963
1964        // Update the ack map with the new locations of the acks
1965        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
1966            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
1967            if (referenceFileIds == null) {
1968                referenceFileIds = new HashSet<Integer>();
1969                referenceFileIds.addAll(entry.getValue());
1970                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
1971            } else {
1972                referenceFileIds.addAll(entry.getValue());
1973            }
1974        }
1975
1976        // remove the old location data from the ack map so that the old journal log file can
1977        // be removed on next GC.
1978        metadata.ackMessageFileMap.remove(journalToRead);
1979
1980        indexLock.writeLock().unlock();
1981
1982        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
1983    }
1984
1985    final Runnable nullCompletionCallback = new Runnable() {
1986        @Override
1987        public void run() {
1988        }
1989    };
1990
1991    private Location checkpointProducerAudit() throws IOException {
1992        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
1993            ByteArrayOutputStream baos = new ByteArrayOutputStream();
1994            ObjectOutputStream oout = new ObjectOutputStream(baos);
1995            oout.writeObject(metadata.producerSequenceIdTracker);
1996            oout.flush();
1997            oout.close();
1998            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1999            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
2000            try {
2001                location.getLatch().await();
2002            } catch (InterruptedException e) {
2003                throw new InterruptedIOException(e.toString());
2004            }
2005            return location;
2006        }
2007        return metadata.producerSequenceIdTrackerLocation;
2008    }
2009
2010    private Location checkpointAckMessageFileMap() throws IOException {
2011        ByteArrayOutputStream baos = new ByteArrayOutputStream();
2012        ObjectOutputStream oout = new ObjectOutputStream(baos);
2013        oout.writeObject(metadata.ackMessageFileMap);
2014        oout.flush();
2015        oout.close();
2016        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2017        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
2018        try {
2019            location.getLatch().await();
2020        } catch (InterruptedException e) {
2021            throw new InterruptedIOException(e.toString());
2022        }
2023        return location;
2024    }
2025
2026    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
2027
2028        ByteSequence sequence = toByteSequence(subscription);
2029        Location location = journal.write(sequence, nullCompletionCallback) ;
2030
2031        try {
2032            location.getLatch().await();
2033        } catch (InterruptedException e) {
2034            throw new InterruptedIOException(e.toString());
2035        }
2036        return location;
2037    }
2038
2039    public HashSet<Integer> getJournalFilesBeingReplicated() {
2040        return journalFilesBeingReplicated;
2041    }
2042
2043    // /////////////////////////////////////////////////////////////////
2044    // StoredDestination related implementation methods.
2045    // /////////////////////////////////////////////////////////////////
2046
2047    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
2048
2049    static class MessageKeys {
2050        final String messageId;
2051        final Location location;
2052
2053        public MessageKeys(String messageId, Location location) {
2054            this.messageId=messageId;
2055            this.location=location;
2056        }
2057
2058        @Override
2059        public String toString() {
2060            return "["+messageId+","+location+"]";
2061        }
2062    }
2063
2064    protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
2065        final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller();
2066
2067        @Override
2068        public MessageKeys readPayload(DataInput dataIn) throws IOException {
2069            return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn));
2070        }
2071
2072        @Override
2073        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
2074            dataOut.writeUTF(object.messageId);
2075            locationSizeMarshaller.writePayload(object.location, dataOut);
2076        }
2077    }
2078
2079    class LastAck {
2080        long lastAckedSequence;
2081        byte priority;
2082
2083        public LastAck(LastAck source) {
2084            this.lastAckedSequence = source.lastAckedSequence;
2085            this.priority = source.priority;
2086        }
2087
2088        public LastAck() {
2089            this.priority = MessageOrderIndex.HI;
2090        }
2091
2092        public LastAck(long ackLocation) {
2093            this.lastAckedSequence = ackLocation;
2094            this.priority = MessageOrderIndex.LO;
2095        }
2096
2097        public LastAck(long ackLocation, byte priority) {
2098            this.lastAckedSequence = ackLocation;
2099            this.priority = priority;
2100        }
2101
2102        @Override
2103        public String toString() {
2104            return "[" + lastAckedSequence + ":" + priority + "]";
2105        }
2106    }
2107
2108    protected class LastAckMarshaller implements Marshaller<LastAck> {
2109
2110        @Override
2111        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
2112            dataOut.writeLong(object.lastAckedSequence);
2113            dataOut.writeByte(object.priority);
2114        }
2115
2116        @Override
2117        public LastAck readPayload(DataInput dataIn) throws IOException {
2118            LastAck lastAcked = new LastAck();
2119            lastAcked.lastAckedSequence = dataIn.readLong();
2120            if (metadata.version >= 3) {
2121                lastAcked.priority = dataIn.readByte();
2122            }
2123            return lastAcked;
2124        }
2125
2126        @Override
2127        public int getFixedSize() {
2128            return 9;
2129        }
2130
2131        @Override
2132        public LastAck deepCopy(LastAck source) {
2133            return new LastAck(source);
2134        }
2135
2136        @Override
2137        public boolean isDeepCopySupported() {
2138            return true;
2139        }
2140    }
2141
2142    class StoredDestination {
2143
2144        MessageOrderIndex orderIndex = new MessageOrderIndex();
2145        BTreeIndex<Location, Long> locationIndex;
2146        BTreeIndex<String, Long> messageIdIndex;
2147
2148        // These bits are only set for Topics
2149        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
2150        BTreeIndex<String, LastAck> subscriptionAcks;
2151        HashMap<String, MessageOrderCursor> subscriptionCursors;
2152        ListIndex<String, SequenceSet> ackPositions;
2153        ListIndex<String, Location> subLocations;
2154
2155        // Transient data used to track which Messages are no longer needed.
2156        final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
2157        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
2158
2159        public void trackPendingAdd(Long seq) {
2160            orderIndex.trackPendingAdd(seq);
2161        }
2162
2163        public void trackPendingAddComplete(Long seq) {
2164            orderIndex.trackPendingAddComplete(seq);
2165        }
2166
2167        @Override
2168        public String toString() {
2169            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
2170        }
2171    }
2172
2173    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
2174
2175        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
2176
2177        @Override
2178        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
2179            final StoredDestination value = new StoredDestination();
2180            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2181            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
2182            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
2183
2184            if (dataIn.readBoolean()) {
2185                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
2186                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
2187                if (metadata.version >= 4) {
2188                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
2189                } else {
2190                    // upgrade
2191                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2192                        @Override
2193                        public void execute(Transaction tx) throws IOException {
2194                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
2195
2196                            if (metadata.version >= 3) {
2197                                // migrate
2198                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2199                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
2200                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2201                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2202                                oldAckPositions.load(tx);
2203
2204
2205                                // Do the initial build of the data in memory before writing into the store
2206                                // based Ack Positions List to avoid a lot of disk thrashing.
2207                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2208                                while (iterator.hasNext()) {
2209                                    Entry<Long, HashSet<String>> entry = iterator.next();
2210
2211                                    for(String subKey : entry.getValue()) {
2212                                        SequenceSet pendingAcks = temp.get(subKey);
2213                                        if (pendingAcks == null) {
2214                                            pendingAcks = new SequenceSet();
2215                                            temp.put(subKey, pendingAcks);
2216                                        }
2217
2218                                        pendingAcks.add(entry.getKey());
2219                                    }
2220                                }
2221                            }
2222                            // Now move the pending messages to ack data into the store backed
2223                            // structure.
2224                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2225                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2226                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2227                            value.ackPositions.load(tx);
2228                            for(String subscriptionKey : temp.keySet()) {
2229                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2230                            }
2231
2232                        }
2233                    });
2234                }
2235
2236                if (metadata.version >= 5) {
2237                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
2238                } else {
2239                    // upgrade
2240                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2241                        @Override
2242                        public void execute(Transaction tx) throws IOException {
2243                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2244                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2245                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2246                            value.subLocations.load(tx);
2247                        }
2248                    });
2249                }
2250            }
2251            if (metadata.version >= 2) {
2252                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2253                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2254            } else {
2255                // upgrade
2256                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2257                    @Override
2258                    public void execute(Transaction tx) throws IOException {
2259                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2260                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2261                        value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2262                        value.orderIndex.lowPriorityIndex.load(tx);
2263
2264                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2265                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2266                        value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2267                        value.orderIndex.highPriorityIndex.load(tx);
2268                    }
2269                });
2270            }
2271
2272            return value;
2273        }
2274
2275        @Override
2276        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2277            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2278            dataOut.writeLong(value.locationIndex.getPageId());
2279            dataOut.writeLong(value.messageIdIndex.getPageId());
2280            if (value.subscriptions != null) {
2281                dataOut.writeBoolean(true);
2282                dataOut.writeLong(value.subscriptions.getPageId());
2283                dataOut.writeLong(value.subscriptionAcks.getPageId());
2284                dataOut.writeLong(value.ackPositions.getHeadPageId());
2285                dataOut.writeLong(value.subLocations.getHeadPageId());
2286            } else {
2287                dataOut.writeBoolean(false);
2288            }
2289            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2290            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2291        }
2292    }
2293
2294    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2295        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2296
2297        @Override
2298        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2299            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2300            rc.mergeFramed((InputStream)dataIn);
2301            return rc;
2302        }
2303
2304        @Override
2305        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2306            object.writeFramed((OutputStream)dataOut);
2307        }
2308    }
2309
2310    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2311        String key = key(destination);
2312        StoredDestination rc = storedDestinations.get(key);
2313        if (rc == null) {
2314            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2315            rc = loadStoredDestination(tx, key, topic);
2316            // Cache it. We may want to remove/unload destinations from the
2317            // cache that are not used for a while
2318            // to reduce memory usage.
2319            storedDestinations.put(key, rc);
2320        }
2321        return rc;
2322    }
2323
2324    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2325        String key = key(destination);
2326        StoredDestination rc = storedDestinations.get(key);
2327        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2328            rc = getStoredDestination(destination, tx);
2329        }
2330        return rc;
2331    }
2332
2333    /**
2334     * @param tx
2335     * @param key
2336     * @param topic
2337     * @return
2338     * @throws IOException
2339     */
2340    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2341        // Try to load the existing indexes..
2342        StoredDestination rc = metadata.destinations.get(tx, key);
2343        if (rc == null) {
2344            // Brand new destination.. allocate indexes for it.
2345            rc = new StoredDestination();
2346            rc.orderIndex.allocate(tx);
2347            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
2348            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
2349
2350            if (topic) {
2351                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
2352                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
2353                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2354                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2355            }
2356            metadata.destinations.put(tx, key, rc);
2357        }
2358
2359        // Configure the marshalers and load.
2360        rc.orderIndex.load(tx);
2361
2362        // Figure out the next key using the last entry in the destination.
2363        rc.orderIndex.configureLast(tx);
2364
2365        rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
2366        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2367        rc.locationIndex.load(tx);
2368
2369        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2370        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2371        rc.messageIdIndex.load(tx);
2372
2373        //go through an upgrade old index if older than version 6
2374        if (metadata.version < 6) {
2375            for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) {
2376                Entry<Location, Long> entry = iterator.next();
2377                // modify so it is upgraded
2378                rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
2379            }
2380            //upgrade the order index
2381            for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) {
2382                Entry<Long, MessageKeys> entry = iterator.next();
2383                //call get so that the last priority is updated
2384                rc.orderIndex.get(tx, entry.getKey());
2385                rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue());
2386            }
2387        }
2388
2389        // If it was a topic...
2390        if (topic) {
2391
2392            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2393            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2394            rc.subscriptions.load(tx);
2395
2396            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2397            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2398            rc.subscriptionAcks.load(tx);
2399
2400            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2401            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2402            rc.ackPositions.load(tx);
2403
2404            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2405            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2406            rc.subLocations.load(tx);
2407
2408            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
2409
2410            if (metadata.version < 3) {
2411
2412                // on upgrade need to fill ackLocation with available messages past last ack
2413                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2414                    Entry<String, LastAck> entry = iterator.next();
2415                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2416                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2417                        Long sequence = orderIterator.next().getKey();
2418                        addAckLocation(tx, rc, sequence, entry.getKey());
2419                    }
2420                    // modify so it is upgraded
2421                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2422                }
2423            }
2424
2425            // Configure the message references index
2426            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2427            while (subscriptions.hasNext()) {
2428                Entry<String, SequenceSet> subscription = subscriptions.next();
2429                SequenceSet pendingAcks = subscription.getValue();
2430                if (pendingAcks != null && !pendingAcks.isEmpty()) {
2431                    Long lastPendingAck = pendingAcks.getTail().getLast();
2432                    for (Long sequenceId : pendingAcks) {
2433                        Long current = rc.messageReferences.get(sequenceId);
2434                        if (current == null) {
2435                            current = new Long(0);
2436                        }
2437
2438                        // We always add a trailing empty entry for the next position to start from
2439                        // so we need to ensure we don't count that as a message reference on reload.
2440                        if (!sequenceId.equals(lastPendingAck)) {
2441                            current = current.longValue() + 1;
2442                        } else {
2443                            current = Long.valueOf(0L);
2444                        }
2445
2446                        rc.messageReferences.put(sequenceId, current);
2447                    }
2448                }
2449            }
2450
2451            // Configure the subscription cache
2452            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2453                Entry<String, LastAck> entry = iterator.next();
2454                rc.subscriptionCache.add(entry.getKey());
2455            }
2456
2457            if (rc.orderIndex.nextMessageId == 0) {
2458                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2459                if (!rc.subscriptionAcks.isEmpty(tx)) {
2460                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2461                        Entry<String, LastAck> entry = iterator.next();
2462                        rc.orderIndex.nextMessageId =
2463                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2464                    }
2465                }
2466            } else {
2467                // update based on ackPositions for unmatched, last entry is always the next
2468                if (!rc.messageReferences.isEmpty()) {
2469                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
2470                    rc.orderIndex.nextMessageId =
2471                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
2472                }
2473            }
2474        }
2475
2476        if (metadata.version < VERSION) {
2477            // store again after upgrade
2478            metadata.destinations.put(tx, key, rc);
2479        }
2480        return rc;
2481    }
2482
2483    /**
2484     * Clear the counter for the destination, if one exists.
2485     *
2486     * @param kahaDestination
2487     */
2488    protected void clearStoreStats(KahaDestination kahaDestination) {
2489        MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination));
2490        if (storeStats != null) {
2491            storeStats.reset();
2492        }
2493    }
2494
2495    /**
2496     * Update MessageStoreStatistics
2497     *
2498     * @param kahaDestination
2499     * @param size
2500     */
2501    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
2502        incrementAndAddSizeToStoreStat(key(kahaDestination), size);
2503    }
2504
2505    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
2506        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2507        if (storeStats != null) {
2508            storeStats.getMessageCount().increment();
2509            if (size > 0) {
2510                storeStats.getMessageSize().addSize(size);
2511            }
2512        }
2513    }
2514
2515    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
2516        decrementAndSubSizeToStoreStat(key(kahaDestination), size);
2517    }
2518
2519    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
2520        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2521        if (storeStats != null) {
2522            storeStats.getMessageCount().decrement();
2523            if (size > 0) {
2524                storeStats.getMessageSize().addSize(-size);
2525            }
2526        }
2527    }
2528
2529    /**
2530     * This is a map to cache DestinationStatistics for a specific
2531     * KahaDestination key
2532     */
2533    protected final ConcurrentMap<String, MessageStore> storeCache =
2534            new ConcurrentHashMap<String, MessageStore>();
2535
2536    /**
2537     * Locate the storeMessageSize counter for this KahaDestination
2538     * @param kahaDestination
2539     * @return
2540     */
2541    protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
2542        MessageStoreStatistics storeStats = null;
2543        try {
2544            MessageStore messageStore = storeCache.get(kahaDestKey);
2545            if (messageStore != null) {
2546                storeStats = messageStore.getMessageStoreStatistics();
2547            }
2548        } catch (Exception e1) {
2549             LOG.error("Getting size counter of destination failed", e1);
2550        }
2551
2552        return storeStats;
2553    }
2554
2555    /**
2556     * Determine whether this Destination matches the DestinationType
2557     *
2558     * @param destination
2559     * @param type
2560     * @return
2561     */
2562    protected boolean matchType(Destination destination,
2563            KahaDestination.DestinationType type) {
2564        if (destination instanceof Topic
2565                && type.equals(KahaDestination.DestinationType.TOPIC)) {
2566            return true;
2567        } else if (destination instanceof Queue
2568                && type.equals(KahaDestination.DestinationType.QUEUE)) {
2569            return true;
2570        }
2571        return false;
2572    }
2573
2574    class LocationSizeMarshaller implements Marshaller<Location> {
2575
2576        public LocationSizeMarshaller() {
2577
2578        }
2579
2580        @Override
2581        public Location readPayload(DataInput dataIn) throws IOException {
2582            Location rc = new Location();
2583            rc.setDataFileId(dataIn.readInt());
2584            rc.setOffset(dataIn.readInt());
2585            if (metadata.version >= 6) {
2586                rc.setSize(dataIn.readInt());
2587            }
2588            return rc;
2589        }
2590
2591        @Override
2592        public void writePayload(Location object, DataOutput dataOut)
2593                throws IOException {
2594            dataOut.writeInt(object.getDataFileId());
2595            dataOut.writeInt(object.getOffset());
2596            dataOut.writeInt(object.getSize());
2597        }
2598
2599        @Override
2600        public int getFixedSize() {
2601            return 12;
2602        }
2603
2604        @Override
2605        public Location deepCopy(Location source) {
2606            return new Location(source);
2607        }
2608
2609        @Override
2610        public boolean isDeepCopySupported() {
2611            return true;
2612        }
2613    }
2614
2615    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2616        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2617        if (sequences == null) {
2618            sequences = new SequenceSet();
2619            sequences.add(messageSequence);
2620            sd.ackPositions.add(tx, subscriptionKey, sequences);
2621        } else {
2622            sequences.add(messageSequence);
2623            sd.ackPositions.put(tx, subscriptionKey, sequences);
2624        }
2625
2626        Long count = sd.messageReferences.get(messageSequence);
2627        if (count == null) {
2628            count = Long.valueOf(0L);
2629        }
2630        count = count.longValue() + 1;
2631        sd.messageReferences.put(messageSequence, count);
2632    }
2633
2634    // new sub is interested in potentially all existing messages
2635    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2636        SequenceSet allOutstanding = new SequenceSet();
2637        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2638        while (iterator.hasNext()) {
2639            SequenceSet set = iterator.next().getValue();
2640            for (Long entry : set) {
2641                allOutstanding.add(entry);
2642            }
2643        }
2644        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2645
2646        for (Long ackPosition : allOutstanding) {
2647            Long count = sd.messageReferences.get(ackPosition);
2648
2649            // There might not be a reference if the ackLocation was the last
2650            // one which is a placeholder for the next incoming message and
2651            // no value was added to the message references table.
2652            if (count != null) {
2653                count = count.longValue() + 1;
2654                sd.messageReferences.put(ackPosition, count);
2655            }
2656        }
2657    }
2658
2659    // on a new message add, all existing subs are interested in this message
2660    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
2661        for(String subscriptionKey : sd.subscriptionCache) {
2662            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2663            if (sequences == null) {
2664                sequences = new SequenceSet();
2665                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2666                sd.ackPositions.add(tx, subscriptionKey, sequences);
2667            } else {
2668                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2669                sd.ackPositions.put(tx, subscriptionKey, sequences);
2670            }
2671
2672            Long count = sd.messageReferences.get(messageSequence);
2673            if (count == null) {
2674                count = Long.valueOf(0L);
2675            }
2676            count = count.longValue() + 1;
2677            sd.messageReferences.put(messageSequence, count);
2678            sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
2679        }
2680    }
2681
2682    private void removeAckLocationsForSub(KahaSubscriptionCommand command,
2683            Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2684        if (!sd.ackPositions.isEmpty(tx)) {
2685            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2686            if (sequences == null || sequences.isEmpty()) {
2687                return;
2688            }
2689
2690            ArrayList<Long> unreferenced = new ArrayList<Long>();
2691
2692            for(Long sequenceId : sequences) {
2693                Long references = sd.messageReferences.get(sequenceId);
2694                if (references != null) {
2695                    references = references.longValue() - 1;
2696
2697                    if (references.longValue() > 0) {
2698                        sd.messageReferences.put(sequenceId, references);
2699                    } else {
2700                        sd.messageReferences.remove(sequenceId);
2701                        unreferenced.add(sequenceId);
2702                    }
2703                }
2704            }
2705
2706            for(Long sequenceId : unreferenced) {
2707                // Find all the entries that need to get deleted.
2708                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2709                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2710
2711                // Do the actual deletes.
2712                for (Entry<Long, MessageKeys> entry : deletes) {
2713                    sd.locationIndex.remove(tx, entry.getValue().location);
2714                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2715                    sd.orderIndex.remove(tx, entry.getKey());
2716                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2717                }
2718            }
2719        }
2720    }
2721
2722    /**
2723     * @param tx
2724     * @param sd
2725     * @param subscriptionKey
2726     * @param messageSequence
2727     * @throws IOException
2728     */
2729    private void removeAckLocation(KahaRemoveMessageCommand command,
2730            Transaction tx, StoredDestination sd, String subscriptionKey,
2731            Long messageSequence) throws IOException {
2732        // Remove the sub from the previous location set..
2733        if (messageSequence != null) {
2734            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2735            if (range != null && !range.isEmpty()) {
2736                range.remove(messageSequence);
2737                if (!range.isEmpty()) {
2738                    sd.ackPositions.put(tx, subscriptionKey, range);
2739                } else {
2740                    sd.ackPositions.remove(tx, subscriptionKey);
2741                }
2742
2743                // Check if the message is reference by any other subscription.
2744                Long count = sd.messageReferences.get(messageSequence);
2745                if (count != null) {
2746                    long references = count.longValue() - 1;
2747                    if (references > 0) {
2748                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
2749                        return;
2750                    } else {
2751                        sd.messageReferences.remove(messageSequence);
2752                    }
2753                }
2754
2755                // Find all the entries that need to get deleted.
2756                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2757                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2758
2759                // Do the actual deletes.
2760                for (Entry<Long, MessageKeys> entry : deletes) {
2761                    sd.locationIndex.remove(tx, entry.getValue().location);
2762                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2763                    sd.orderIndex.remove(tx, entry.getKey());
2764                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2765                }
2766            }
2767        }
2768    }
2769
2770    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2771        return sd.subscriptionAcks.get(tx, subscriptionKey);
2772    }
2773
2774    public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2775        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2776        if (messageSequences != null) {
2777            long result = messageSequences.rangeSize();
2778            // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2779            return result > 0 ? result - 1 : 0;
2780        }
2781
2782        return 0;
2783    }
2784
2785    public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2786        //grab the messages attached to this subscription
2787        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2788
2789        long locationSize = 0;
2790        if (messageSequences != null) {
2791            Sequence head = messageSequences.getHead();
2792            if (head != null) {
2793                //get an iterator over the order index starting at the first unacked message
2794                //and go over each message to add up the size
2795                Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
2796                        new MessageOrderCursor(head.getFirst()));
2797
2798                while (iterator.hasNext()) {
2799                    Entry<Long, MessageKeys> entry = iterator.next();
2800                    locationSize += entry.getValue().location.getSize();
2801                }
2802            }
2803        }
2804
2805        return locationSize;
2806    }
2807
2808    protected String key(KahaDestination destination) {
2809        return destination.getType().getNumber() + ":" + destination.getName();
2810    }
2811
2812    // /////////////////////////////////////////////////////////////////
2813    // Transaction related implementation methods.
2814    // /////////////////////////////////////////////////////////////////
2815    @SuppressWarnings("rawtypes")
2816    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2817    @SuppressWarnings("rawtypes")
2818    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2819    protected final Set<String> ackedAndPrepared = new HashSet<String>();
2820    protected final Set<String> rolledBackAcks = new HashSet<String>();
2821
2822    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2823    // till then they are skipped by the store.
2824    // 'at most once' XA guarantee
2825    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2826        this.indexLock.writeLock().lock();
2827        try {
2828            for (MessageAck ack : acks) {
2829                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
2830            }
2831        } finally {
2832            this.indexLock.writeLock().unlock();
2833        }
2834    }
2835
2836    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
2837        if (acks != null) {
2838            this.indexLock.writeLock().lock();
2839            try {
2840                for (MessageAck ack : acks) {
2841                    final String id = ack.getLastMessageId().toProducerKey();
2842                    ackedAndPrepared.remove(id);
2843                    if (rollback) {
2844                        rolledBackAcks.add(id);
2845                    }
2846                }
2847            } finally {
2848                this.indexLock.writeLock().unlock();
2849            }
2850        }
2851    }
2852
2853    @SuppressWarnings("rawtypes")
2854    private List<Operation> getInflightTx(KahaTransactionInfo info) {
2855        TransactionId key = TransactionIdConversion.convert(info);
2856        List<Operation> tx;
2857        synchronized (inflightTransactions) {
2858            tx = inflightTransactions.get(key);
2859            if (tx == null) {
2860                tx = Collections.synchronizedList(new ArrayList<Operation>());
2861                inflightTransactions.put(key, tx);
2862            }
2863        }
2864        return tx;
2865    }
2866
2867    @SuppressWarnings("unused")
2868    private TransactionId key(KahaTransactionInfo transactionInfo) {
2869        return TransactionIdConversion.convert(transactionInfo);
2870    }
2871
2872    abstract class Operation <T extends JournalCommand<T>> {
2873        final T command;
2874        final Location location;
2875
2876        public Operation(T command, Location location) {
2877            this.command = command;
2878            this.location = location;
2879        }
2880
2881        public Location getLocation() {
2882            return location;
2883        }
2884
2885        public T getCommand() {
2886            return command;
2887        }
2888
2889        abstract public void execute(Transaction tx) throws IOException;
2890    }
2891
2892    class AddOperation extends Operation<KahaAddMessageCommand> {
2893        final IndexAware runWithIndexLock;
2894        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
2895            super(command, location);
2896            this.runWithIndexLock = runWithIndexLock;
2897        }
2898
2899        @Override
2900        public void execute(Transaction tx) throws IOException {
2901            long seq = updateIndex(tx, command, location);
2902            if (runWithIndexLock != null) {
2903                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
2904            }
2905        }
2906    }
2907
2908    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
2909
2910        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
2911            super(command, location);
2912        }
2913
2914        @Override
2915        public void execute(Transaction tx) throws IOException {
2916            updateIndex(tx, command, location);
2917        }
2918    }
2919
2920    // /////////////////////////////////////////////////////////////////
2921    // Initialization related implementation methods.
2922    // /////////////////////////////////////////////////////////////////
2923
2924    private PageFile createPageFile() throws IOException {
2925        if (indexDirectory == null) {
2926            indexDirectory = directory;
2927        }
2928        IOHelper.mkdirs(indexDirectory);
2929        PageFile index = new PageFile(indexDirectory, "db");
2930        index.setEnableWriteThread(isEnableIndexWriteAsync());
2931        index.setWriteBatchSize(getIndexWriteBatchSize());
2932        index.setPageCacheSize(indexCacheSize);
2933        index.setUseLFRUEviction(isUseIndexLFRUEviction());
2934        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2935        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2936        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2937        index.setEnablePageCaching(isEnableIndexPageCaching());
2938        return index;
2939    }
2940
2941    private Journal createJournal() throws IOException {
2942        Journal manager = new Journal();
2943        manager.setDirectory(directory);
2944        manager.setMaxFileLength(getJournalMaxFileLength());
2945        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2946        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2947        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2948        manager.setArchiveDataLogs(isArchiveDataLogs());
2949        manager.setSizeAccumulator(journalSize);
2950        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2951        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
2952        manager.setPreallocationStrategy(
2953                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
2954        if (getDirectoryArchive() != null) {
2955            IOHelper.mkdirs(getDirectoryArchive());
2956            manager.setDirectoryArchive(getDirectoryArchive());
2957        }
2958        return manager;
2959    }
2960
2961    private Metadata createMetadata() {
2962        Metadata md = new Metadata();
2963        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
2964        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
2965        return md;
2966    }
2967
2968    protected abstract void configureMetadata();
2969
2970    public int getJournalMaxWriteBatchSize() {
2971        return journalMaxWriteBatchSize;
2972    }
2973
2974    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2975        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2976    }
2977
2978    public File getDirectory() {
2979        return directory;
2980    }
2981
2982    public void setDirectory(File directory) {
2983        this.directory = directory;
2984    }
2985
2986    public boolean isDeleteAllMessages() {
2987        return deleteAllMessages;
2988    }
2989
2990    public void setDeleteAllMessages(boolean deleteAllMessages) {
2991        this.deleteAllMessages = deleteAllMessages;
2992    }
2993
2994    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2995        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2996    }
2997
2998    public int getIndexWriteBatchSize() {
2999        return setIndexWriteBatchSize;
3000    }
3001
3002    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
3003        this.enableIndexWriteAsync = enableIndexWriteAsync;
3004    }
3005
3006    boolean isEnableIndexWriteAsync() {
3007        return enableIndexWriteAsync;
3008    }
3009
3010    public boolean isEnableJournalDiskSyncs() {
3011        return enableJournalDiskSyncs;
3012    }
3013
3014    public void setEnableJournalDiskSyncs(boolean syncWrites) {
3015        this.enableJournalDiskSyncs = syncWrites;
3016    }
3017
3018    public long getCheckpointInterval() {
3019        return checkpointInterval;
3020    }
3021
3022    public void setCheckpointInterval(long checkpointInterval) {
3023        this.checkpointInterval = checkpointInterval;
3024    }
3025
3026    public long getCleanupInterval() {
3027        return cleanupInterval;
3028    }
3029
3030    public void setCleanupInterval(long cleanupInterval) {
3031        this.cleanupInterval = cleanupInterval;
3032    }
3033
3034    public void setJournalMaxFileLength(int journalMaxFileLength) {
3035        this.journalMaxFileLength = journalMaxFileLength;
3036    }
3037
3038    public int getJournalMaxFileLength() {
3039        return journalMaxFileLength;
3040    }
3041
3042    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
3043        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
3044    }
3045
3046    public int getMaxFailoverProducersToTrack() {
3047        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
3048    }
3049
3050    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
3051        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
3052    }
3053
3054    public int getFailoverProducersAuditDepth() {
3055        return this.metadata.producerSequenceIdTracker.getAuditDepth();
3056    }
3057
3058    public PageFile getPageFile() throws IOException {
3059        if (pageFile == null) {
3060            pageFile = createPageFile();
3061        }
3062        return pageFile;
3063    }
3064
3065    public Journal getJournal() throws IOException {
3066        if (journal == null) {
3067            journal = createJournal();
3068        }
3069        return journal;
3070    }
3071
3072    public boolean isFailIfDatabaseIsLocked() {
3073        return failIfDatabaseIsLocked;
3074    }
3075
3076    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
3077        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
3078    }
3079
3080    public boolean isIgnoreMissingJournalfiles() {
3081        return ignoreMissingJournalfiles;
3082    }
3083
3084    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
3085        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
3086    }
3087
3088    public int getIndexCacheSize() {
3089        return indexCacheSize;
3090    }
3091
3092    public void setIndexCacheSize(int indexCacheSize) {
3093        this.indexCacheSize = indexCacheSize;
3094    }
3095
3096    public boolean isCheckForCorruptJournalFiles() {
3097        return checkForCorruptJournalFiles;
3098    }
3099
3100    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
3101        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
3102    }
3103
3104    public boolean isChecksumJournalFiles() {
3105        return checksumJournalFiles;
3106    }
3107
3108    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
3109        this.checksumJournalFiles = checksumJournalFiles;
3110    }
3111
3112    @Override
3113    public void setBrokerService(BrokerService brokerService) {
3114        this.brokerService = brokerService;
3115    }
3116
3117    /**
3118     * @return the archiveDataLogs
3119     */
3120    public boolean isArchiveDataLogs() {
3121        return this.archiveDataLogs;
3122    }
3123
3124    /**
3125     * @param archiveDataLogs the archiveDataLogs to set
3126     */
3127    public void setArchiveDataLogs(boolean archiveDataLogs) {
3128        this.archiveDataLogs = archiveDataLogs;
3129    }
3130
3131    /**
3132     * @return the directoryArchive
3133     */
3134    public File getDirectoryArchive() {
3135        return this.directoryArchive;
3136    }
3137
3138    /**
3139     * @param directoryArchive the directoryArchive to set
3140     */
3141    public void setDirectoryArchive(File directoryArchive) {
3142        this.directoryArchive = directoryArchive;
3143    }
3144
3145    public boolean isArchiveCorruptedIndex() {
3146        return archiveCorruptedIndex;
3147    }
3148
3149    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
3150        this.archiveCorruptedIndex = archiveCorruptedIndex;
3151    }
3152
3153    public float getIndexLFUEvictionFactor() {
3154        return indexLFUEvictionFactor;
3155    }
3156
3157    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
3158        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
3159    }
3160
3161    public boolean isUseIndexLFRUEviction() {
3162        return useIndexLFRUEviction;
3163    }
3164
3165    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
3166        this.useIndexLFRUEviction = useIndexLFRUEviction;
3167    }
3168
3169    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
3170        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
3171    }
3172
3173    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
3174        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
3175    }
3176
3177    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
3178        this.enableIndexPageCaching = enableIndexPageCaching;
3179    }
3180
3181    public boolean isEnableIndexDiskSyncs() {
3182        return enableIndexDiskSyncs;
3183    }
3184
3185    public boolean isEnableIndexRecoveryFile() {
3186        return enableIndexRecoveryFile;
3187    }
3188
3189    public boolean isEnableIndexPageCaching() {
3190        return enableIndexPageCaching;
3191    }
3192
3193    // /////////////////////////////////////////////////////////////////
3194    // Internal conversion methods.
3195    // /////////////////////////////////////////////////////////////////
3196
3197    class MessageOrderCursor{
3198        long defaultCursorPosition;
3199        long lowPriorityCursorPosition;
3200        long highPriorityCursorPosition;
3201        MessageOrderCursor(){
3202        }
3203
3204        MessageOrderCursor(long position){
3205            this.defaultCursorPosition=position;
3206            this.lowPriorityCursorPosition=position;
3207            this.highPriorityCursorPosition=position;
3208        }
3209
3210        MessageOrderCursor(MessageOrderCursor other){
3211            this.defaultCursorPosition=other.defaultCursorPosition;
3212            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3213            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3214        }
3215
3216        MessageOrderCursor copy() {
3217            return new MessageOrderCursor(this);
3218        }
3219
3220        void reset() {
3221            this.defaultCursorPosition=0;
3222            this.highPriorityCursorPosition=0;
3223            this.lowPriorityCursorPosition=0;
3224        }
3225
3226        void increment() {
3227            if (defaultCursorPosition!=0) {
3228                defaultCursorPosition++;
3229            }
3230            if (highPriorityCursorPosition!=0) {
3231                highPriorityCursorPosition++;
3232            }
3233            if (lowPriorityCursorPosition!=0) {
3234                lowPriorityCursorPosition++;
3235            }
3236        }
3237
3238        @Override
3239        public String toString() {
3240           return "MessageOrderCursor:[def:" + defaultCursorPosition
3241                   + ", low:" + lowPriorityCursorPosition
3242                   + ", high:" +  highPriorityCursorPosition + "]";
3243        }
3244
3245        public void sync(MessageOrderCursor other) {
3246            this.defaultCursorPosition=other.defaultCursorPosition;
3247            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3248            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3249        }
3250    }
3251
3252    class MessageOrderIndex {
3253        static final byte HI = 9;
3254        static final byte LO = 0;
3255        static final byte DEF = 4;
3256
3257        long nextMessageId;
3258        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3259        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3260        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3261        final MessageOrderCursor cursor = new MessageOrderCursor();
3262        Long lastDefaultKey;
3263        Long lastHighKey;
3264        Long lastLowKey;
3265        byte lastGetPriority;
3266        final List<Long> pendingAdditions = new LinkedList<Long>();
3267        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
3268
3269        MessageKeys remove(Transaction tx, Long key) throws IOException {
3270            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3271            if (result == null && highPriorityIndex!=null) {
3272                result = highPriorityIndex.remove(tx, key);
3273                if (result ==null && lowPriorityIndex!=null) {
3274                    result = lowPriorityIndex.remove(tx, key);
3275                }
3276            }
3277            return result;
3278        }
3279
3280        void load(Transaction tx) throws IOException {
3281            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3282            defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3283            defaultPriorityIndex.load(tx);
3284            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3285            lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3286            lowPriorityIndex.load(tx);
3287            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3288            highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3289            highPriorityIndex.load(tx);
3290        }
3291
3292        void allocate(Transaction tx) throws IOException {
3293            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3294            if (metadata.version >= 2) {
3295                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3296                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3297            }
3298        }
3299
3300        void configureLast(Transaction tx) throws IOException {
3301            // Figure out the next key using the last entry in the destination.
3302            TreeSet<Long> orderedSet = new TreeSet<Long>();
3303
3304            addLast(orderedSet, highPriorityIndex, tx);
3305            addLast(orderedSet, defaultPriorityIndex, tx);
3306            addLast(orderedSet, lowPriorityIndex, tx);
3307
3308            if (!orderedSet.isEmpty()) {
3309                nextMessageId = orderedSet.last() + 1;
3310            }
3311        }
3312
3313        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3314            if (index != null) {
3315                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3316                if (lastEntry != null) {
3317                    orderedSet.add(lastEntry.getKey());
3318                }
3319            }
3320        }
3321
3322        void clear(Transaction tx) throws IOException {
3323            this.remove(tx);
3324            this.resetCursorPosition();
3325            this.allocate(tx);
3326            this.load(tx);
3327            this.configureLast(tx);
3328        }
3329
3330        void remove(Transaction tx) throws IOException {
3331            defaultPriorityIndex.clear(tx);
3332            defaultPriorityIndex.unload(tx);
3333            tx.free(defaultPriorityIndex.getPageId());
3334            if (lowPriorityIndex != null) {
3335                lowPriorityIndex.clear(tx);
3336                lowPriorityIndex.unload(tx);
3337
3338                tx.free(lowPriorityIndex.getPageId());
3339            }
3340            if (highPriorityIndex != null) {
3341                highPriorityIndex.clear(tx);
3342                highPriorityIndex.unload(tx);
3343                tx.free(highPriorityIndex.getPageId());
3344            }
3345        }
3346
3347        void resetCursorPosition() {
3348            this.cursor.reset();
3349            lastDefaultKey = null;
3350            lastHighKey = null;
3351            lastLowKey = null;
3352        }
3353
3354        void setBatch(Transaction tx, Long sequence) throws IOException {
3355            if (sequence != null) {
3356                Long nextPosition = new Long(sequence.longValue() + 1);
3357                lastDefaultKey = sequence;
3358                cursor.defaultCursorPosition = nextPosition.longValue();
3359                lastHighKey = sequence;
3360                cursor.highPriorityCursorPosition = nextPosition.longValue();
3361                lastLowKey = sequence;
3362                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3363            }
3364        }
3365
3366        void setBatch(Transaction tx, LastAck last) throws IOException {
3367            setBatch(tx, last.lastAckedSequence);
3368            if (cursor.defaultCursorPosition == 0
3369                    && cursor.highPriorityCursorPosition == 0
3370                    && cursor.lowPriorityCursorPosition == 0) {
3371                long next = last.lastAckedSequence + 1;
3372                switch (last.priority) {
3373                    case DEF:
3374                        cursor.defaultCursorPosition = next;
3375                        cursor.highPriorityCursorPosition = next;
3376                        break;
3377                    case HI:
3378                        cursor.highPriorityCursorPosition = next;
3379                        break;
3380                    case LO:
3381                        cursor.lowPriorityCursorPosition = next;
3382                        cursor.defaultCursorPosition = next;
3383                        cursor.highPriorityCursorPosition = next;
3384                        break;
3385                }
3386            }
3387        }
3388
3389        void stoppedIterating() {
3390            if (lastDefaultKey!=null) {
3391                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3392            }
3393            if (lastHighKey!=null) {
3394                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3395            }
3396            if (lastLowKey!=null) {
3397                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3398            }
3399            lastDefaultKey = null;
3400            lastHighKey = null;
3401            lastLowKey = null;
3402        }
3403
3404        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3405                throws IOException {
3406            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3407                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3408            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3409                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3410            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3411                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3412            }
3413        }
3414
3415        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3416                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3417
3418            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3419            deletes.add(iterator.next());
3420        }
3421
3422        long getNextMessageId() {
3423            return nextMessageId++;
3424        }
3425
3426        void revertNextMessageId() {
3427            nextMessageId--;
3428        }
3429
3430        MessageKeys get(Transaction tx, Long key) throws IOException {
3431            MessageKeys result = defaultPriorityIndex.get(tx, key);
3432            if (result == null) {
3433                result = highPriorityIndex.get(tx, key);
3434                if (result == null) {
3435                    result = lowPriorityIndex.get(tx, key);
3436                    lastGetPriority = LO;
3437                } else {
3438                    lastGetPriority = HI;
3439                }
3440            } else {
3441                lastGetPriority = DEF;
3442            }
3443            return result;
3444        }
3445
3446        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3447            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3448                return defaultPriorityIndex.put(tx, key, value);
3449            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3450                return highPriorityIndex.put(tx, key, value);
3451            } else {
3452                return lowPriorityIndex.put(tx, key, value);
3453            }
3454        }
3455
3456        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3457            return new MessageOrderIterator(tx,cursor,this);
3458        }
3459
3460        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3461            return new MessageOrderIterator(tx,m,this);
3462        }
3463
3464        public byte lastGetPriority() {
3465            return lastGetPriority;
3466        }
3467
3468        public boolean alreadyDispatched(Long sequence) {
3469            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3470                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3471                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3472        }
3473
3474        public void trackPendingAdd(Long seq) {
3475            synchronized (pendingAdditions) {
3476                pendingAdditions.add(seq);
3477            }
3478        }
3479
3480        public void trackPendingAddComplete(Long seq) {
3481            synchronized (pendingAdditions) {
3482                pendingAdditions.remove(seq);
3483            }
3484        }
3485
3486        public Long minPendingAdd() {
3487            synchronized (pendingAdditions) {
3488                if (!pendingAdditions.isEmpty()) {
3489                    return pendingAdditions.get(0);
3490                } else {
3491                    return null;
3492                }
3493            }
3494        }
3495
3496        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3497            Iterator<Entry<Long, MessageKeys>>currentIterator;
3498            final Iterator<Entry<Long, MessageKeys>>highIterator;
3499            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3500            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3501
3502            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3503                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3504                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3505                if (highPriorityIndex != null) {
3506                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3507                } else {
3508                    this.highIterator = null;
3509                }
3510                if (lowPriorityIndex != null) {
3511                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3512                } else {
3513                    this.lowIterator = null;
3514                }
3515            }
3516
3517            @Override
3518            public boolean hasNext() {
3519                if (currentIterator == null) {
3520                    if (highIterator != null) {
3521                        if (highIterator.hasNext()) {
3522                            currentIterator = highIterator;
3523                            return currentIterator.hasNext();
3524                        }
3525                        if (defaultIterator.hasNext()) {
3526                            currentIterator = defaultIterator;
3527                            return currentIterator.hasNext();
3528                        }
3529                        if (lowIterator.hasNext()) {
3530                            currentIterator = lowIterator;
3531                            return currentIterator.hasNext();
3532                        }
3533                        return false;
3534                    } else {
3535                        currentIterator = defaultIterator;
3536                        return currentIterator.hasNext();
3537                    }
3538                }
3539                if (highIterator != null) {
3540                    if (currentIterator.hasNext()) {
3541                        return true;
3542                    }
3543                    if (currentIterator == highIterator) {
3544                        if (defaultIterator.hasNext()) {
3545                            currentIterator = defaultIterator;
3546                            return currentIterator.hasNext();
3547                        }
3548                        if (lowIterator.hasNext()) {
3549                            currentIterator = lowIterator;
3550                            return currentIterator.hasNext();
3551                        }
3552                        return false;
3553                    }
3554
3555                    if (currentIterator == defaultIterator) {
3556                        if (lowIterator.hasNext()) {
3557                            currentIterator = lowIterator;
3558                            return currentIterator.hasNext();
3559                        }
3560                        return false;
3561                    }
3562                }
3563                return currentIterator.hasNext();
3564            }
3565
3566            @Override
3567            public Entry<Long, MessageKeys> next() {
3568                Entry<Long, MessageKeys> result = currentIterator.next();
3569                if (result != null) {
3570                    Long key = result.getKey();
3571                    if (highIterator != null) {
3572                        if (currentIterator == defaultIterator) {
3573                            lastDefaultKey = key;
3574                        } else if (currentIterator == highIterator) {
3575                            lastHighKey = key;
3576                        } else {
3577                            lastLowKey = key;
3578                        }
3579                    } else {
3580                        lastDefaultKey = key;
3581                    }
3582                }
3583                return result;
3584            }
3585
3586            @Override
3587            public void remove() {
3588                throw new UnsupportedOperationException();
3589            }
3590        }
3591    }
3592
3593    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3594        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3595
3596        @Override
3597        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3598            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3599            ObjectOutputStream oout = new ObjectOutputStream(baos);
3600            oout.writeObject(object);
3601            oout.flush();
3602            oout.close();
3603            byte[] data = baos.toByteArray();
3604            dataOut.writeInt(data.length);
3605            dataOut.write(data);
3606        }
3607
3608        @Override
3609        @SuppressWarnings("unchecked")
3610        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3611            int dataLen = dataIn.readInt();
3612            byte[] data = new byte[dataLen];
3613            dataIn.readFully(data);
3614            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3615            ObjectInputStream oin = new ObjectInputStream(bais);
3616            try {
3617                return (HashSet<String>) oin.readObject();
3618            } catch (ClassNotFoundException cfe) {
3619                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3620                ioe.initCause(cfe);
3621                throw ioe;
3622            }
3623        }
3624    }
3625
3626    public File getIndexDirectory() {
3627        return indexDirectory;
3628    }
3629
3630    public void setIndexDirectory(File indexDirectory) {
3631        this.indexDirectory = indexDirectory;
3632    }
3633
3634    interface IndexAware {
3635        public void sequenceAssignedWithIndexLocked(long index);
3636    }
3637
3638    public String getPreallocationScope() {
3639        return preallocationScope;
3640    }
3641
3642    public void setPreallocationScope(String preallocationScope) {
3643        this.preallocationScope = preallocationScope;
3644    }
3645
3646    public String getPreallocationStrategy() {
3647        return preallocationStrategy;
3648    }
3649
3650    public void setPreallocationStrategy(String preallocationStrategy) {
3651        this.preallocationStrategy = preallocationStrategy;
3652    }
3653
3654    public int getCompactAcksAfterNoGC() {
3655        return compactAcksAfterNoGC;
3656    }
3657
3658    /**
3659     * Sets the number of GC cycles where no journal logs were removed before an attempt to
3660     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
3661     * <p>
3662     * A value of -1 will disable this feature.
3663     *
3664     * @param compactAcksAfterNoGC
3665     *      Number of empty GC cycles before we rewrite old ACKS.
3666     */
3667    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
3668        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
3669    }
3670
3671    /**
3672     * Returns whether Ack compaction will ignore that the store is still growing
3673     * and run more often.
3674     *
3675     * @return the compactAcksIgnoresStoreGrowth current value.
3676     */
3677    public boolean isCompactAcksIgnoresStoreGrowth() {
3678        return compactAcksIgnoresStoreGrowth;
3679    }
3680
3681    /**
3682     * Configure if Ack compaction will occur regardless of continued growth of the
3683     * journal logs meaning that the store has not run out of space yet.  Because the
3684     * compaction operation can be costly this value is defaulted to off and the Ack
3685     * compaction is only done when it seems that the store cannot grow and larger.
3686     *
3687     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
3688     */
3689    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
3690        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
3691    }
3692
3693    /**
3694     * Returns whether Ack compaction is enabled
3695     *
3696     * @return enableAckCompaction
3697     */
3698    public boolean isEnableAckCompaction() {
3699        return enableAckCompaction;
3700    }
3701
3702    /**
3703     * Configure if the Ack compaction task should be enabled to run
3704     *
3705     * @param enableAckCompaction
3706     */
3707    public void setEnableAckCompaction(boolean enableAckCompaction) {
3708        this.enableAckCompaction = enableAckCompaction;
3709    }
3710}