001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.EOFException;
024import java.io.File;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.InterruptedIOException;
028import java.io.ObjectInputStream;
029import java.io.ObjectOutputStream;
030import java.io.OutputStream;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.Date;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.LinkedHashMap;
040import java.util.LinkedHashSet;
041import java.util.LinkedList;
042import java.util.List;
043import java.util.Map;
044import java.util.Map.Entry;
045import java.util.Set;
046import java.util.SortedSet;
047import java.util.TreeMap;
048import java.util.TreeSet;
049import java.util.concurrent.ConcurrentHashMap;
050import java.util.concurrent.ConcurrentMap;
051import java.util.concurrent.Executors;
052import java.util.concurrent.ScheduledExecutorService;
053import java.util.concurrent.ThreadFactory;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicBoolean;
056import java.util.concurrent.atomic.AtomicLong;
057import java.util.concurrent.locks.ReentrantReadWriteLock;
058
059import org.apache.activemq.ActiveMQMessageAuditNoSync;
060import org.apache.activemq.broker.BrokerService;
061import org.apache.activemq.broker.BrokerServiceAware;
062import org.apache.activemq.broker.region.Destination;
063import org.apache.activemq.broker.region.Queue;
064import org.apache.activemq.broker.region.Topic;
065import org.apache.activemq.command.MessageAck;
066import org.apache.activemq.command.TransactionId;
067import org.apache.activemq.openwire.OpenWireFormat;
068import org.apache.activemq.protobuf.Buffer;
069import org.apache.activemq.store.MessageStore;
070import org.apache.activemq.store.MessageStoreStatistics;
071import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
072import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
073import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
074import org.apache.activemq.store.kahadb.data.KahaDestination;
075import org.apache.activemq.store.kahadb.data.KahaEntryType;
076import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
077import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
078import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
079import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
080import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
081import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
082import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
083import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
084import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
085import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
086import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
087import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
088import org.apache.activemq.store.kahadb.disk.index.ListIndex;
089import org.apache.activemq.store.kahadb.disk.journal.DataFile;
090import org.apache.activemq.store.kahadb.disk.journal.Journal;
091import org.apache.activemq.store.kahadb.disk.journal.Location;
092import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
093import org.apache.activemq.store.kahadb.disk.page.Page;
094import org.apache.activemq.store.kahadb.disk.page.PageFile;
095import org.apache.activemq.store.kahadb.disk.page.Transaction;
096import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
097import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
098import org.apache.activemq.store.kahadb.disk.util.Marshaller;
099import org.apache.activemq.store.kahadb.disk.util.Sequence;
100import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
101import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
102import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
103import org.apache.activemq.util.ByteSequence;
104import org.apache.activemq.util.DataByteArrayInputStream;
105import org.apache.activemq.util.DataByteArrayOutputStream;
106import org.apache.activemq.util.IOExceptionSupport;
107import org.apache.activemq.util.IOHelper;
108import org.apache.activemq.util.ServiceStopper;
109import org.apache.activemq.util.ServiceSupport;
110import org.apache.activemq.util.ThreadPoolUtils;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113
114public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
115
116    protected BrokerService brokerService;
117
118    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
119    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
120    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
121    protected static final Buffer UNMATCHED;
122    static {
123        UNMATCHED = new Buffer(new byte[]{});
124    }
125    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
126
127    static final int CLOSED_STATE = 1;
128    static final int OPEN_STATE = 2;
129    static final long NOT_ACKED = -1;
130
131    static final int VERSION = 6;
132
133    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
134
135    protected class Metadata {
136        protected Page<Metadata> page;
137        protected int state;
138        protected BTreeIndex<String, StoredDestination> destinations;
139        protected Location lastUpdate;
140        protected Location firstInProgressTransactionLocation;
141        protected Location producerSequenceIdTrackerLocation = null;
142        protected Location ackMessageFileMapLocation = null;
143        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
144        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
145        protected int version = VERSION;
146        protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
147
148        public void read(DataInput is) throws IOException {
149            state = is.readInt();
150            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
151            if (is.readBoolean()) {
152                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
153            } else {
154                lastUpdate = null;
155            }
156            if (is.readBoolean()) {
157                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
158            } else {
159                firstInProgressTransactionLocation = null;
160            }
161            try {
162                if (is.readBoolean()) {
163                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
164                } else {
165                    producerSequenceIdTrackerLocation = null;
166                }
167            } catch (EOFException expectedOnUpgrade) {
168            }
169            try {
170                version = is.readInt();
171            } catch (EOFException expectedOnUpgrade) {
172                version = 1;
173            }
174            if (version >= 5 && is.readBoolean()) {
175                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
176            } else {
177                ackMessageFileMapLocation = null;
178            }
179            try {
180                openwireVersion = is.readInt();
181            } catch (EOFException expectedOnUpgrade) {
182                openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
183            }
184            LOG.info("KahaDB is version " + version);
185        }
186
187        public void write(DataOutput os) throws IOException {
188            os.writeInt(state);
189            os.writeLong(destinations.getPageId());
190
191            if (lastUpdate != null) {
192                os.writeBoolean(true);
193                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
194            } else {
195                os.writeBoolean(false);
196            }
197
198            if (firstInProgressTransactionLocation != null) {
199                os.writeBoolean(true);
200                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
201            } else {
202                os.writeBoolean(false);
203            }
204
205            if (producerSequenceIdTrackerLocation != null) {
206                os.writeBoolean(true);
207                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
208            } else {
209                os.writeBoolean(false);
210            }
211            os.writeInt(VERSION);
212            if (ackMessageFileMapLocation != null) {
213                os.writeBoolean(true);
214                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
215            } else {
216                os.writeBoolean(false);
217            }
218            os.writeInt(this.openwireVersion);
219        }
220    }
221
222    class MetadataMarshaller extends VariableMarshaller<Metadata> {
223        @Override
224        public Metadata readPayload(DataInput dataIn) throws IOException {
225            Metadata rc = createMetadata();
226            rc.read(dataIn);
227            return rc;
228        }
229
230        @Override
231        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
232            object.write(dataOut);
233        }
234    }
235
236    protected PageFile pageFile;
237    protected Journal journal;
238    protected Metadata metadata = new Metadata();
239
240    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
241
242    protected boolean failIfDatabaseIsLocked;
243
244    protected boolean deleteAllMessages;
245    protected File directory = DEFAULT_DIRECTORY;
246    protected File indexDirectory = null;
247    protected ScheduledExecutorService scheduler;
248    private final Object schedulerLock = new Object();
249
250    protected boolean enableJournalDiskSyncs = true;
251    protected boolean archiveDataLogs;
252    protected File directoryArchive;
253    protected AtomicLong journalSize = new AtomicLong(0);
254    long checkpointInterval = 5*1000;
255    long cleanupInterval = 30*1000;
256    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
257    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
258    boolean enableIndexWriteAsync = false;
259    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
260    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
261    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
262
263    protected AtomicBoolean opened = new AtomicBoolean();
264    private boolean ignoreMissingJournalfiles = false;
265    private int indexCacheSize = 10000;
266    private boolean checkForCorruptJournalFiles = false;
267    private boolean checksumJournalFiles = true;
268    protected boolean forceRecoverIndex = false;
269    private boolean archiveCorruptedIndex = false;
270    private boolean useIndexLFRUEviction = false;
271    private float indexLFUEvictionFactor = 0.2f;
272    private boolean enableIndexDiskSyncs = true;
273    private boolean enableIndexRecoveryFile = true;
274    private boolean enableIndexPageCaching = true;
275    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
276
277    private boolean enableAckCompaction = false;
278    private int compactAcksAfterNoGC = 10;
279    private boolean compactAcksIgnoresStoreGrowth = false;
280    private int checkPointCyclesWithNoGC;
281    private int journalLogOnLastCompactionCheck;
282
283    @Override
284    public void doStart() throws Exception {
285        load();
286    }
287
288    @Override
289    public void doStop(ServiceStopper stopper) throws Exception {
290        unload();
291    }
292
293    private void loadPageFile() throws IOException {
294        this.indexLock.writeLock().lock();
295        try {
296            final PageFile pageFile = getPageFile();
297            pageFile.load();
298            pageFile.tx().execute(new Transaction.Closure<IOException>() {
299                @Override
300                public void execute(Transaction tx) throws IOException {
301                    if (pageFile.getPageCount() == 0) {
302                        // First time this is created.. Initialize the metadata
303                        Page<Metadata> page = tx.allocate();
304                        assert page.getPageId() == 0;
305                        page.set(metadata);
306                        metadata.page = page;
307                        metadata.state = CLOSED_STATE;
308                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
309
310                        tx.store(metadata.page, metadataMarshaller, true);
311                    } else {
312                        Page<Metadata> page = tx.load(0, metadataMarshaller);
313                        metadata = page.get();
314                        metadata.page = page;
315                    }
316                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
317                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
318                    metadata.destinations.load(tx);
319                }
320            });
321            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
322            // Perhaps we should just keep an index of file
323            storedDestinations.clear();
324            pageFile.tx().execute(new Transaction.Closure<IOException>() {
325                @Override
326                public void execute(Transaction tx) throws IOException {
327                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
328                        Entry<String, StoredDestination> entry = iterator.next();
329                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
330                        storedDestinations.put(entry.getKey(), sd);
331
332                        if (checkForCorruptJournalFiles) {
333                            // sanity check the index also
334                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
335                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
336                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
337                                }
338                            }
339                        }
340                    }
341                }
342            });
343            pageFile.flush();
344        } finally {
345            this.indexLock.writeLock().unlock();
346        }
347    }
348
349    private void startCheckpoint() {
350        if (checkpointInterval == 0 && cleanupInterval == 0) {
351            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
352            return;
353        }
354        synchronized (schedulerLock) {
355            if (scheduler == null) {
356                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
357
358                    @Override
359                    public Thread newThread(Runnable r) {
360                        Thread schedulerThread = new Thread(r);
361
362                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
363                        schedulerThread.setDaemon(true);
364
365                        return schedulerThread;
366                    }
367                });
368
369                // Short intervals for check-point and cleanups
370                long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
371
372                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
373            }
374        }
375    }
376
377    private final class CheckpointRunner implements Runnable {
378
379        private long lastCheckpoint = System.currentTimeMillis();
380        private long lastCleanup = System.currentTimeMillis();
381
382        @Override
383        public void run() {
384            try {
385                // Decide on cleanup vs full checkpoint here.
386                if (opened.get()) {
387                    long now = System.currentTimeMillis();
388                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
389                        checkpointCleanup(true);
390                        lastCleanup = now;
391                        lastCheckpoint = now;
392                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
393                        checkpointCleanup(false);
394                        lastCheckpoint = now;
395                    }
396                }
397            } catch (IOException ioe) {
398                LOG.error("Checkpoint failed", ioe);
399                brokerService.handleIOException(ioe);
400            } catch (Throwable e) {
401                LOG.error("Checkpoint failed", e);
402                brokerService.handleIOException(IOExceptionSupport.create(e));
403            }
404        }
405    }
406
407    public void open() throws IOException {
408        if( opened.compareAndSet(false, true) ) {
409            getJournal().start();
410            try {
411                loadPageFile();
412            } catch (Throwable t) {
413                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
414                if (LOG.isDebugEnabled()) {
415                    LOG.debug("Index load failure", t);
416                }
417                // try to recover index
418                try {
419                    pageFile.unload();
420                } catch (Exception ignore) {}
421                if (archiveCorruptedIndex) {
422                    pageFile.archive();
423                } else {
424                    pageFile.delete();
425                }
426                metadata = createMetadata();
427                //The metadata was recreated after a detect corruption so we need to
428                //reconfigure anything that was configured on the old metadata on startup
429                configureMetadata();
430                pageFile = null;
431                loadPageFile();
432            }
433            startCheckpoint();
434            recover();
435        }
436    }
437
438    public void load() throws IOException {
439        this.indexLock.writeLock().lock();
440        IOHelper.mkdirs(directory);
441        try {
442            if (deleteAllMessages) {
443                getJournal().start();
444                getJournal().delete();
445                getJournal().close();
446                journal = null;
447                getPageFile().delete();
448                LOG.info("Persistence store purged.");
449                deleteAllMessages = false;
450            }
451
452            open();
453            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
454        } finally {
455            this.indexLock.writeLock().unlock();
456        }
457    }
458
459    public void close() throws IOException, InterruptedException {
460        if( opened.compareAndSet(true, false)) {
461            checkpointLock.writeLock().lock();
462            try {
463                if (metadata.page != null) {
464                    checkpointUpdate(true);
465                }
466                pageFile.unload();
467                metadata = createMetadata();
468            } finally {
469                checkpointLock.writeLock().unlock();
470            }
471            journal.close();
472            ThreadPoolUtils.shutdownGraceful(scheduler, -1);
473            // clear the cache and journalSize on shutdown of the store
474            storeCache.clear();
475            journalSize.set(0);
476        }
477    }
478
479    public void unload() throws IOException, InterruptedException {
480        this.indexLock.writeLock().lock();
481        try {
482            if( pageFile != null && pageFile.isLoaded() ) {
483                metadata.state = CLOSED_STATE;
484                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
485
486                if (metadata.page != null) {
487                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
488                        @Override
489                        public void execute(Transaction tx) throws IOException {
490                            tx.store(metadata.page, metadataMarshaller, true);
491                        }
492                    });
493                }
494            }
495        } finally {
496            this.indexLock.writeLock().unlock();
497        }
498        close();
499    }
500
501    // public for testing
502    @SuppressWarnings("rawtypes")
503    public Location[] getInProgressTxLocationRange() {
504        Location[] range = new Location[]{null, null};
505        synchronized (inflightTransactions) {
506            if (!inflightTransactions.isEmpty()) {
507                for (List<Operation> ops : inflightTransactions.values()) {
508                    if (!ops.isEmpty()) {
509                        trackMaxAndMin(range, ops);
510                    }
511                }
512            }
513            if (!preparedTransactions.isEmpty()) {
514                for (List<Operation> ops : preparedTransactions.values()) {
515                    if (!ops.isEmpty()) {
516                        trackMaxAndMin(range, ops);
517                    }
518                }
519            }
520        }
521        return range;
522    }
523
524    @SuppressWarnings("rawtypes")
525    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
526        Location t = ops.get(0).getLocation();
527        if (range[0] == null || t.compareTo(range[0]) <= 0) {
528            range[0] = t;
529        }
530        t = ops.get(ops.size() -1).getLocation();
531        if (range[1] == null || t.compareTo(range[1]) >= 0) {
532            range[1] = t;
533        }
534    }
535
536    class TranInfo {
537        TransactionId id;
538        Location location;
539
540        class opCount {
541            int add;
542            int remove;
543        }
544        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
545
546        @SuppressWarnings("rawtypes")
547        public void track(Operation operation) {
548            if (location == null ) {
549                location = operation.getLocation();
550            }
551            KahaDestination destination;
552            boolean isAdd = false;
553            if (operation instanceof AddOperation) {
554                AddOperation add = (AddOperation) operation;
555                destination = add.getCommand().getDestination();
556                isAdd = true;
557            } else {
558                RemoveOperation removeOpperation = (RemoveOperation) operation;
559                destination = removeOpperation.getCommand().getDestination();
560            }
561            opCount opCount = destinationOpCount.get(destination);
562            if (opCount == null) {
563                opCount = new opCount();
564                destinationOpCount.put(destination, opCount);
565            }
566            if (isAdd) {
567                opCount.add++;
568            } else {
569                opCount.remove++;
570            }
571        }
572
573        @Override
574        public String toString() {
575           StringBuffer buffer = new StringBuffer();
576           buffer.append(location).append(";").append(id).append(";\n");
577           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
578               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
579           }
580           return buffer.toString();
581        }
582    }
583
584    @SuppressWarnings("rawtypes")
585    public String getTransactions() {
586
587        ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
588        synchronized (inflightTransactions) {
589            if (!inflightTransactions.isEmpty()) {
590                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
591                    TranInfo info = new TranInfo();
592                    info.id = entry.getKey();
593                    for (Operation operation : entry.getValue()) {
594                        info.track(operation);
595                    }
596                    infos.add(info);
597                }
598            }
599        }
600        synchronized (preparedTransactions) {
601            if (!preparedTransactions.isEmpty()) {
602                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
603                    TranInfo info = new TranInfo();
604                    info.id = entry.getKey();
605                    for (Operation operation : entry.getValue()) {
606                        info.track(operation);
607                    }
608                    infos.add(info);
609                }
610            }
611        }
612        return infos.toString();
613    }
614
615    /**
616     * Move all the messages that were in the journal into long term storage. We
617     * just replay and do a checkpoint.
618     *
619     * @throws IOException
620     * @throws IOException
621     * @throws IllegalStateException
622     */
623    private void recover() throws IllegalStateException, IOException {
624        this.indexLock.writeLock().lock();
625        try {
626
627            long start = System.currentTimeMillis();
628            Location producerAuditPosition = recoverProducerAudit();
629            Location ackMessageFileLocation = recoverAckMessageFileMap();
630            Location lastIndoubtPosition = getRecoveryPosition();
631
632            Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
633            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
634
635            if (recoveryPosition != null) {
636                int redoCounter = 0;
637                LOG.info("Recovering from the journal @" + recoveryPosition);
638                while (recoveryPosition != null) {
639                    try {
640                        JournalCommand<?> message = load(recoveryPosition);
641                        metadata.lastUpdate = recoveryPosition;
642                        process(message, recoveryPosition, lastIndoubtPosition);
643                        redoCounter++;
644                    } catch (IOException failedRecovery) {
645                        if (isIgnoreMissingJournalfiles()) {
646                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
647                            // track this dud location
648                            journal.corruptRecoveryLocation(recoveryPosition);
649                        } else {
650                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
651                        }
652                    }
653                    recoveryPosition = journal.getNextLocation(recoveryPosition);
654                     if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
655                         LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries recovered ..");
656                     }
657                }
658                if (LOG.isInfoEnabled()) {
659                    long end = System.currentTimeMillis();
660                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
661                }
662            }
663
664            // We may have to undo some index updates.
665            pageFile.tx().execute(new Transaction.Closure<IOException>() {
666                @Override
667                public void execute(Transaction tx) throws IOException {
668                    recoverIndex(tx);
669                }
670            });
671
672            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
673            Set<TransactionId> toRollback = new HashSet<TransactionId>();
674            Set<TransactionId> toDiscard = new HashSet<TransactionId>();
675            synchronized (inflightTransactions) {
676                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
677                    TransactionId id = it.next();
678                    if (id.isLocalTransaction()) {
679                        toRollback.add(id);
680                    } else {
681                        toDiscard.add(id);
682                    }
683                }
684                for (TransactionId tx: toRollback) {
685                    if (LOG.isDebugEnabled()) {
686                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
687                    }
688                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
689                }
690                for (TransactionId tx: toDiscard) {
691                    if (LOG.isDebugEnabled()) {
692                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
693                    }
694                    inflightTransactions.remove(tx);
695                }
696            }
697
698            synchronized (preparedTransactions) {
699                for (TransactionId txId : preparedTransactions.keySet()) {
700                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
701                }
702            }
703
704        } finally {
705            this.indexLock.writeLock().unlock();
706        }
707    }
708
709    @SuppressWarnings("unused")
710    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
711        return TransactionIdConversion.convertToLocal(tx);
712    }
713
714    private Location minimum(Location producerAuditPosition,
715            Location lastIndoubtPosition) {
716        Location min = null;
717        if (producerAuditPosition != null) {
718            min = producerAuditPosition;
719            if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
720                min = lastIndoubtPosition;
721            }
722        } else {
723            min = lastIndoubtPosition;
724        }
725        return min;
726    }
727
728    private Location recoverProducerAudit() throws IOException {
729        if (metadata.producerSequenceIdTrackerLocation != null) {
730            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
731            try {
732                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
733                int maxNumProducers = getMaxFailoverProducersToTrack();
734                int maxAuditDepth = getFailoverProducersAuditDepth();
735                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
736                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
737                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
738                return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
739            } catch (Exception e) {
740                LOG.warn("Cannot recover message audit", e);
741                return journal.getNextLocation(null);
742            }
743        } else {
744            // got no audit stored so got to recreate via replay from start of the journal
745            return journal.getNextLocation(null);
746        }
747    }
748
749    @SuppressWarnings("unchecked")
750    private Location recoverAckMessageFileMap() throws IOException {
751        if (metadata.ackMessageFileMapLocation != null) {
752            KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
753            try {
754                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
755                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
756                return journal.getNextLocation(metadata.ackMessageFileMapLocation);
757            } catch (Exception e) {
758                LOG.warn("Cannot recover ackMessageFileMap", e);
759                return journal.getNextLocation(null);
760            }
761        } else {
762            // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
763            return journal.getNextLocation(null);
764        }
765    }
766
767    protected void recoverIndex(Transaction tx) throws IOException {
768        long start = System.currentTimeMillis();
769        // It is possible index updates got applied before the journal updates..
770        // in that case we need to removed references to messages that are not in the journal
771        final Location lastAppendLocation = journal.getLastAppendLocation();
772        long undoCounter=0;
773
774        // Go through all the destinations to see if they have messages past the lastAppendLocation
775        for (String key : storedDestinations.keySet()) {
776            StoredDestination sd = storedDestinations.get(key);
777
778            final ArrayList<Long> matches = new ArrayList<Long>();
779            // Find all the Locations that are >= than the last Append Location.
780            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
781                @Override
782                protected void matched(Location key, Long value) {
783                    matches.add(value);
784                }
785            });
786
787            for (Long sequenceId : matches) {
788                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
789                if (keys != null) {
790                    sd.locationIndex.remove(tx, keys.location);
791                    sd.messageIdIndex.remove(tx, keys.messageId);
792                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
793                    undoCounter++;
794                    decrementAndSubSizeToStoreStat(key, keys.location.getSize());
795                    // TODO: do we need to modify the ack positions for the pub sub case?
796                }
797            }
798        }
799
800        if (undoCounter > 0) {
801            // The rolledback operations are basically in flight journal writes.  To avoid getting
802            // these the end user should do sync writes to the journal.
803            if (LOG.isInfoEnabled()) {
804                long end = System.currentTimeMillis();
805                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
806            }
807        }
808
809        undoCounter = 0;
810        start = System.currentTimeMillis();
811
812        // Lets be extra paranoid here and verify that all the datafiles being referenced
813        // by the indexes still exists.
814
815        final SequenceSet ss = new SequenceSet();
816        for (StoredDestination sd : storedDestinations.values()) {
817            // Use a visitor to cut down the number of pages that we load
818            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
819                int last=-1;
820
821                @Override
822                public boolean isInterestedInKeysBetween(Location first, Location second) {
823                    if( first==null ) {
824                        return !ss.contains(0, second.getDataFileId());
825                    } else if( second==null ) {
826                        return true;
827                    } else {
828                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
829                    }
830                }
831
832                @Override
833                public void visit(List<Location> keys, List<Long> values) {
834                    for (Location l : keys) {
835                        int fileId = l.getDataFileId();
836                        if( last != fileId ) {
837                            ss.add(fileId);
838                            last = fileId;
839                        }
840                    }
841                }
842
843            });
844        }
845        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
846        while (!ss.isEmpty()) {
847            missingJournalFiles.add((int) ss.removeFirst());
848        }
849
850        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
851            missingJournalFiles.add(entry.getKey());
852            for (Integer i : entry.getValue()) {
853                missingJournalFiles.add(i);
854            }
855        }
856
857        missingJournalFiles.removeAll(journal.getFileMap().keySet());
858
859        if (!missingJournalFiles.isEmpty()) {
860            LOG.warn("Some journal files are missing: " + missingJournalFiles);
861        }
862
863        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
864        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
865        for (Integer missing : missingJournalFiles) {
866            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
867        }
868
869        if (checkForCorruptJournalFiles) {
870            Collection<DataFile> dataFiles = journal.getFileMap().values();
871            for (DataFile dataFile : dataFiles) {
872                int id = dataFile.getDataFileId();
873                // eof to next file id
874                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
875                Sequence seq = dataFile.getCorruptedBlocks().getHead();
876                while (seq != null) {
877                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
878                        new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
879                    missingPredicates.add(visitor);
880                    knownCorruption.add(visitor);
881                    seq = seq.getNext();
882                }
883            }
884        }
885
886        if (!missingPredicates.isEmpty()) {
887            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
888                final StoredDestination sd = sdEntry.getValue();
889                final ArrayList<Long> matches = new ArrayList<Long>();
890                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
891                    @Override
892                    protected void matched(Location key, Long value) {
893                        matches.add(value);
894                    }
895                });
896
897                // If some message references are affected by the missing data files...
898                if (!matches.isEmpty()) {
899
900                    // We either 'gracefully' recover dropping the missing messages or
901                    // we error out.
902                    if( ignoreMissingJournalfiles ) {
903                        // Update the index to remove the references to the missing data
904                        for (Long sequenceId : matches) {
905                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
906                            sd.locationIndex.remove(tx, keys.location);
907                            sd.messageIdIndex.remove(tx, keys.messageId);
908                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
909                            undoCounter++;
910                            decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
911                            // TODO: do we need to modify the ack positions for the pub sub case?
912                        }
913                    } else {
914                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations. " + matches.size() + " messages affected.");
915                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
916                    }
917                }
918            }
919        }
920
921        if (!ignoreMissingJournalfiles) {
922            if (!knownCorruption.isEmpty()) {
923                LOG.error("Detected corrupt journal files. " + knownCorruption);
924                throw new IOException("Detected corrupt journal files. " + knownCorruption);
925            }
926
927            if (!missingJournalFiles.isEmpty()) {
928                LOG.error("Detected missing journal files. " + missingJournalFiles);
929                throw new IOException("Detected missing journal files. " + missingJournalFiles);
930            }
931        }
932
933        if (undoCounter > 0) {
934            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
935            // should do sync writes to the journal.
936            if (LOG.isInfoEnabled()) {
937                long end = System.currentTimeMillis();
938                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
939            }
940        }
941    }
942
943    private Location nextRecoveryPosition;
944    private Location lastRecoveryPosition;
945
946    public void incrementalRecover() throws IOException {
947        this.indexLock.writeLock().lock();
948        try {
949            if( nextRecoveryPosition == null ) {
950                if( lastRecoveryPosition==null ) {
951                    nextRecoveryPosition = getRecoveryPosition();
952                } else {
953                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
954                }
955            }
956            while (nextRecoveryPosition != null) {
957                lastRecoveryPosition = nextRecoveryPosition;
958                metadata.lastUpdate = lastRecoveryPosition;
959                JournalCommand<?> message = load(lastRecoveryPosition);
960                process(message, lastRecoveryPosition, (IndexAware) null);
961                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
962            }
963        } finally {
964            this.indexLock.writeLock().unlock();
965        }
966    }
967
968    public Location getLastUpdatePosition() throws IOException {
969        return metadata.lastUpdate;
970    }
971
972    private Location getRecoveryPosition() throws IOException {
973
974        if (!this.forceRecoverIndex) {
975
976            // If we need to recover the transactions..
977            if (metadata.firstInProgressTransactionLocation != null) {
978                return metadata.firstInProgressTransactionLocation;
979            }
980
981            // Perhaps there were no transactions...
982            if( metadata.lastUpdate!=null) {
983                // Start replay at the record after the last one recorded in the index file.
984                return journal.getNextLocation(metadata.lastUpdate);
985            }
986        }
987        // This loads the first position.
988        return journal.getNextLocation(null);
989    }
990
991    protected void checkpointCleanup(final boolean cleanup) throws IOException {
992        long start;
993        this.indexLock.writeLock().lock();
994        try {
995            start = System.currentTimeMillis();
996            if( !opened.get() ) {
997                return;
998            }
999        } finally {
1000            this.indexLock.writeLock().unlock();
1001        }
1002        checkpointUpdate(cleanup);
1003        long end = System.currentTimeMillis();
1004        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1005            if (LOG.isInfoEnabled()) {
1006                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
1007            }
1008        }
1009    }
1010
1011    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
1012        int size = data.serializedSizeFramed();
1013        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
1014        os.writeByte(data.type().getNumber());
1015        data.writeFramed(os);
1016        return os.toByteSequence();
1017    }
1018
1019    // /////////////////////////////////////////////////////////////////
1020    // Methods call by the broker to update and query the store.
1021    // /////////////////////////////////////////////////////////////////
1022    public Location store(JournalCommand<?> data) throws IOException {
1023        return store(data, false, null,null);
1024    }
1025
1026    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1027        return store(data, false, null, null, onJournalStoreComplete);
1028    }
1029
1030    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1031        return store(data, sync, before, after, null);
1032    }
1033
1034    /**
1035     * All updated are are funneled through this method. The updates are converted
1036     * to a JournalMessage which is logged to the journal and then the data from
1037     * the JournalMessage is used to update the index just like it would be done
1038     * during a recovery process.
1039     */
1040    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1041        try {
1042            ByteSequence sequence = toByteSequence(data);
1043            Location location;
1044
1045            checkpointLock.readLock().lock();
1046            try {
1047
1048                long start = System.currentTimeMillis();
1049                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
1050                long start2 = System.currentTimeMillis();
1051                process(data, location, before);
1052
1053                long end = System.currentTimeMillis();
1054                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1055                    if (LOG.isInfoEnabled()) {
1056                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1057                    }
1058                }
1059            } finally {
1060                checkpointLock.readLock().unlock();
1061            }
1062
1063            if (after != null) {
1064                after.run();
1065            }
1066
1067            if (scheduler == null && opened.get()) {
1068                startCheckpoint();
1069            }
1070            return location;
1071        } catch (IOException ioe) {
1072            LOG.error("KahaDB failed to store to Journal", ioe);
1073            brokerService.handleIOException(ioe);
1074            throw ioe;
1075        }
1076    }
1077
1078    /**
1079     * Loads a previously stored JournalMessage
1080     *
1081     * @param location
1082     * @return
1083     * @throws IOException
1084     */
1085    public JournalCommand<?> load(Location location) throws IOException {
1086        long start = System.currentTimeMillis();
1087        ByteSequence data = journal.read(location);
1088        long end = System.currentTimeMillis();
1089        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1090            if (LOG.isInfoEnabled()) {
1091                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1092            }
1093        }
1094        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1095        byte readByte = is.readByte();
1096        KahaEntryType type = KahaEntryType.valueOf(readByte);
1097        if( type == null ) {
1098            try {
1099                is.close();
1100            } catch (IOException e) {}
1101            throw new IOException("Could not load journal record. Invalid location: "+location);
1102        }
1103        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1104        message.mergeFramed(is);
1105        return message;
1106    }
1107
1108    /**
1109     * do minimal recovery till we reach the last inDoubtLocation
1110     * @param data
1111     * @param location
1112     * @param inDoubtlocation
1113     * @throws IOException
1114     */
1115    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1116        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1117            process(data, location, (IndexAware) null);
1118        } else {
1119            // just recover producer audit
1120            data.visit(new Visitor() {
1121                @Override
1122                public void visit(KahaAddMessageCommand command) throws IOException {
1123                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1124                }
1125            });
1126        }
1127    }
1128
1129    // /////////////////////////////////////////////////////////////////
1130    // Journaled record processing methods. Once the record is journaled,
1131    // these methods handle applying the index updates. These may be called
1132    // from the recovery method too so they need to be idempotent
1133    // /////////////////////////////////////////////////////////////////
1134
1135    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1136        data.visit(new Visitor() {
1137            @Override
1138            public void visit(KahaAddMessageCommand command) throws IOException {
1139                process(command, location, onSequenceAssignedCallback);
1140            }
1141
1142            @Override
1143            public void visit(KahaRemoveMessageCommand command) throws IOException {
1144                process(command, location);
1145            }
1146
1147            @Override
1148            public void visit(KahaPrepareCommand command) throws IOException {
1149                process(command, location);
1150            }
1151
1152            @Override
1153            public void visit(KahaCommitCommand command) throws IOException {
1154                process(command, location, onSequenceAssignedCallback);
1155            }
1156
1157            @Override
1158            public void visit(KahaRollbackCommand command) throws IOException {
1159                process(command, location);
1160            }
1161
1162            @Override
1163            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1164                process(command, location);
1165            }
1166
1167            @Override
1168            public void visit(KahaSubscriptionCommand command) throws IOException {
1169                process(command, location);
1170            }
1171
1172            @Override
1173            public void visit(KahaProducerAuditCommand command) throws IOException {
1174                processLocation(location);
1175            }
1176
1177            @Override
1178            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1179                processLocation(location);
1180            }
1181
1182            @Override
1183            public void visit(KahaTraceCommand command) {
1184                processLocation(location);
1185            }
1186
1187            @Override
1188            public void visit(KahaUpdateMessageCommand command) throws IOException {
1189                process(command, location);
1190            }
1191
1192            @Override
1193            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
1194                process(command, location);
1195            }
1196        });
1197    }
1198
1199    @SuppressWarnings("rawtypes")
1200    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1201        if (command.hasTransactionInfo()) {
1202            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1203            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1204        } else {
1205            this.indexLock.writeLock().lock();
1206            try {
1207                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1208                    @Override
1209                    public void execute(Transaction tx) throws IOException {
1210                        long assignedIndex = updateIndex(tx, command, location);
1211                        if (runWithIndexLock != null) {
1212                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1213                        }
1214                    }
1215                });
1216
1217            } finally {
1218                this.indexLock.writeLock().unlock();
1219            }
1220        }
1221    }
1222
1223    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1224        this.indexLock.writeLock().lock();
1225        try {
1226            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1227                @Override
1228                public void execute(Transaction tx) throws IOException {
1229                    updateIndex(tx, command, location);
1230                }
1231            });
1232        } finally {
1233            this.indexLock.writeLock().unlock();
1234        }
1235    }
1236
1237    @SuppressWarnings("rawtypes")
1238    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1239        if (command.hasTransactionInfo()) {
1240           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1241           inflightTx.add(new RemoveOperation(command, location));
1242        } else {
1243            this.indexLock.writeLock().lock();
1244            try {
1245                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1246                    @Override
1247                    public void execute(Transaction tx) throws IOException {
1248                        updateIndex(tx, command, location);
1249                    }
1250                });
1251            } finally {
1252                this.indexLock.writeLock().unlock();
1253            }
1254        }
1255    }
1256
1257    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1258        this.indexLock.writeLock().lock();
1259        try {
1260            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1261                @Override
1262                public void execute(Transaction tx) throws IOException {
1263                    updateIndex(tx, command, location);
1264                }
1265            });
1266        } finally {
1267            this.indexLock.writeLock().unlock();
1268        }
1269    }
1270
1271    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1272        this.indexLock.writeLock().lock();
1273        try {
1274            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1275                @Override
1276                public void execute(Transaction tx) throws IOException {
1277                    updateIndex(tx, command, location);
1278                }
1279            });
1280        } finally {
1281            this.indexLock.writeLock().unlock();
1282        }
1283    }
1284
1285    protected void processLocation(final Location location) {
1286        this.indexLock.writeLock().lock();
1287        try {
1288            metadata.lastUpdate = location;
1289        } finally {
1290            this.indexLock.writeLock().unlock();
1291        }
1292    }
1293
1294    @SuppressWarnings("rawtypes")
1295    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1296        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1297        List<Operation> inflightTx;
1298        synchronized (inflightTransactions) {
1299            inflightTx = inflightTransactions.remove(key);
1300            if (inflightTx == null) {
1301                inflightTx = preparedTransactions.remove(key);
1302            }
1303        }
1304        if (inflightTx == null) {
1305            // only non persistent messages in this tx
1306            if (before != null) {
1307                before.sequenceAssignedWithIndexLocked(-1);
1308            }
1309            return;
1310        }
1311
1312        final List<Operation> messagingTx = inflightTx;
1313        indexLock.writeLock().lock();
1314        try {
1315            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1316                @Override
1317                public void execute(Transaction tx) throws IOException {
1318                    for (Operation op : messagingTx) {
1319                        op.execute(tx);
1320                    }
1321                }
1322            });
1323            metadata.lastUpdate = location;
1324        } finally {
1325            indexLock.writeLock().unlock();
1326        }
1327    }
1328
1329    @SuppressWarnings("rawtypes")
1330    protected void process(KahaPrepareCommand command, Location location) {
1331        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1332        synchronized (inflightTransactions) {
1333            List<Operation> tx = inflightTransactions.remove(key);
1334            if (tx != null) {
1335                preparedTransactions.put(key, tx);
1336            }
1337        }
1338    }
1339
1340    @SuppressWarnings("rawtypes")
1341    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1342        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1343        List<Operation> updates = null;
1344        synchronized (inflightTransactions) {
1345            updates = inflightTransactions.remove(key);
1346            if (updates == null) {
1347                updates = preparedTransactions.remove(key);
1348            }
1349        }
1350    }
1351
1352    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
1353        final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1354        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
1355            // Mark the current journal file as a compacted file so that gc checks can skip
1356            // over logs that are smaller compaction type logs.
1357            DataFile current = journal.getDataFileById(location.getDataFileId());
1358            current.setTypeCode(command.getRewriteType());
1359
1360            // Move offset so that next location read jumps to next file.
1361            location.setOffset(journalMaxFileLength);
1362        }
1363    }
1364
1365    // /////////////////////////////////////////////////////////////////
1366    // These methods do the actual index updates.
1367    // /////////////////////////////////////////////////////////////////
1368
1369    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1370    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1371
1372    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1373        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1374
1375        // Skip adding the message to the index if this is a topic and there are
1376        // no subscriptions.
1377        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1378            return -1;
1379        }
1380
1381        // Add the message.
1382        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1383        long id = sd.orderIndex.getNextMessageId();
1384        Long previous = sd.locationIndex.put(tx, location, id);
1385        if (previous == null) {
1386            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1387            if (previous == null) {
1388                incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1389                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1390                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1391                    addAckLocationForNewMessage(tx, sd, id);
1392                }
1393                metadata.lastUpdate = location;
1394            } else {
1395
1396                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1397                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1398                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1399                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1400                }
1401                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1402                sd.locationIndex.remove(tx, location);
1403                id = -1;
1404            }
1405        } else {
1406            // restore the previous value.. Looks like this was a redo of a previously
1407            // added message. We don't want to assign it a new id as the other indexes would
1408            // be wrong..
1409            sd.locationIndex.put(tx, location, previous);
1410            // ensure sequence is not broken
1411            sd.orderIndex.revertNextMessageId();
1412            metadata.lastUpdate = location;
1413        }
1414        // record this id in any event, initial send or recovery
1415        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1416
1417       return id;
1418    }
1419
1420    void trackPendingAdd(KahaDestination destination, Long seq) {
1421        StoredDestination sd = storedDestinations.get(key(destination));
1422        if (sd != null) {
1423            sd.trackPendingAdd(seq);
1424        }
1425    }
1426
1427    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1428        StoredDestination sd = storedDestinations.get(key(destination));
1429        if (sd != null) {
1430            sd.trackPendingAddComplete(seq);
1431        }
1432    }
1433
1434    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1435        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1436        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1437
1438        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1439        if (id != null) {
1440            MessageKeys previousKeys = sd.orderIndex.put(
1441                    tx,
1442                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1443                    id,
1444                    new MessageKeys(command.getMessageId(), location)
1445            );
1446            sd.locationIndex.put(tx, location, id);
1447            incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1448            // on first update previous is original location, on recovery/replay it may be the updated location
1449            if(previousKeys != null && !previousKeys.location.equals(location)) {
1450                sd.locationIndex.remove(tx, previousKeys.location);
1451                decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
1452            }
1453            metadata.lastUpdate = location;
1454        } else {
1455            //Add the message if it can't be found
1456            this.updateIndex(tx, command, location);
1457        }
1458    }
1459
1460    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1461        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1462        if (!command.hasSubscriptionKey()) {
1463
1464            // In the queue case we just remove the message from the index..
1465            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1466            if (sequenceId != null) {
1467                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1468                if (keys != null) {
1469                    sd.locationIndex.remove(tx, keys.location);
1470                    decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
1471                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1472                    metadata.lastUpdate = ackLocation;
1473                }  else if (LOG.isDebugEnabled()) {
1474                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1475                }
1476            } else if (LOG.isDebugEnabled()) {
1477                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1478            }
1479        } else {
1480            // In the topic case we need remove the message once it's been acked
1481            // by all the subs
1482            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1483
1484            // Make sure it's a valid message id...
1485            if (sequence != null) {
1486                String subscriptionKey = command.getSubscriptionKey();
1487                if (command.getAck() != UNMATCHED) {
1488                    sd.orderIndex.get(tx, sequence);
1489                    byte priority = sd.orderIndex.lastGetPriority();
1490                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1491                }
1492
1493                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1494                if (keys != null) {
1495                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1496                }
1497                // The following method handles deleting un-referenced messages.
1498                removeAckLocation(command, tx, sd, subscriptionKey, sequence);
1499                metadata.lastUpdate = ackLocation;
1500            } else if (LOG.isDebugEnabled()) {
1501                LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1502            }
1503
1504        }
1505    }
1506
1507    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1508        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1509        if (referenceFileIds == null) {
1510            referenceFileIds = new HashSet<Integer>();
1511            referenceFileIds.add(messageLocation.getDataFileId());
1512            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1513        } else {
1514            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1515            if (!referenceFileIds.contains(id)) {
1516                referenceFileIds.add(id);
1517            }
1518        }
1519    }
1520
1521    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1522        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1523        sd.orderIndex.remove(tx);
1524
1525        sd.locationIndex.clear(tx);
1526        sd.locationIndex.unload(tx);
1527        tx.free(sd.locationIndex.getPageId());
1528
1529        sd.messageIdIndex.clear(tx);
1530        sd.messageIdIndex.unload(tx);
1531        tx.free(sd.messageIdIndex.getPageId());
1532
1533        if (sd.subscriptions != null) {
1534            sd.subscriptions.clear(tx);
1535            sd.subscriptions.unload(tx);
1536            tx.free(sd.subscriptions.getPageId());
1537
1538            sd.subscriptionAcks.clear(tx);
1539            sd.subscriptionAcks.unload(tx);
1540            tx.free(sd.subscriptionAcks.getPageId());
1541
1542            sd.ackPositions.clear(tx);
1543            sd.ackPositions.unload(tx);
1544            tx.free(sd.ackPositions.getHeadPageId());
1545
1546            sd.subLocations.clear(tx);
1547            sd.subLocations.unload(tx);
1548            tx.free(sd.subLocations.getHeadPageId());
1549        }
1550
1551        String key = key(command.getDestination());
1552        storedDestinations.remove(key);
1553        metadata.destinations.remove(tx, key);
1554        clearStoreStats(command.getDestination());
1555        storeCache.remove(key(command.getDestination()));
1556    }
1557
1558    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1559        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1560        final String subscriptionKey = command.getSubscriptionKey();
1561
1562        // If set then we are creating it.. otherwise we are destroying the sub
1563        if (command.hasSubscriptionInfo()) {
1564            Location existing = sd.subLocations.get(tx, subscriptionKey);
1565            if (existing != null && existing.compareTo(location) == 0) {
1566                // replay on recovery, ignore
1567                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1568                return;
1569            }
1570
1571            sd.subscriptions.put(tx, subscriptionKey, command);
1572            sd.subLocations.put(tx, subscriptionKey, location);
1573            long ackLocation=NOT_ACKED;
1574            if (!command.getRetroactive()) {
1575                ackLocation = sd.orderIndex.nextMessageId-1;
1576            } else {
1577                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1578            }
1579            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1580            sd.subscriptionCache.add(subscriptionKey);
1581        } else {
1582            // delete the sub...
1583            sd.subscriptions.remove(tx, subscriptionKey);
1584            sd.subLocations.remove(tx, subscriptionKey);
1585            sd.subscriptionAcks.remove(tx, subscriptionKey);
1586            sd.subscriptionCache.remove(subscriptionKey);
1587            removeAckLocationsForSub(command, tx, sd, subscriptionKey);
1588
1589            if (sd.subscriptions.isEmpty(tx)) {
1590                // remove the stored destination
1591                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1592                removeDestinationCommand.setDestination(command.getDestination());
1593                updateIndex(tx, removeDestinationCommand, null);
1594                clearStoreStats(command.getDestination());
1595            }
1596        }
1597    }
1598
1599    private void checkpointUpdate(final boolean cleanup) throws IOException {
1600        checkpointLock.writeLock().lock();
1601        try {
1602            this.indexLock.writeLock().lock();
1603            try {
1604                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1605                    @Override
1606                    public void execute(Transaction tx) throws IOException {
1607                        checkpointUpdate(tx, cleanup);
1608                    }
1609                });
1610            } finally {
1611                this.indexLock.writeLock().unlock();
1612            }
1613
1614        } finally {
1615            checkpointLock.writeLock().unlock();
1616        }
1617    }
1618
1619    /**
1620     * @param tx
1621     * @throws IOException
1622     */
1623    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1624        LOG.debug("Checkpoint started.");
1625
1626        // reflect last update exclusive of current checkpoint
1627        Location lastUpdate = metadata.lastUpdate;
1628
1629        metadata.state = OPEN_STATE;
1630        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1631        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1632        Location[] inProgressTxRange = getInProgressTxLocationRange();
1633        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1634        tx.store(metadata.page, metadataMarshaller, true);
1635        pageFile.flush();
1636
1637        if (cleanup) {
1638
1639            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1640            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1641
1642            if (LOG.isTraceEnabled()) {
1643                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1644            }
1645
1646            if (lastUpdate != null) {
1647                gcCandidateSet.remove(lastUpdate.getDataFileId());
1648            }
1649
1650            // Don't GC files under replication
1651            if( journalFilesBeingReplicated!=null ) {
1652                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1653            }
1654
1655            if (metadata.producerSequenceIdTrackerLocation != null) {
1656                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1657                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1658                    // rewrite so we don't prevent gc
1659                    metadata.producerSequenceIdTracker.setModified(true);
1660                    if (LOG.isTraceEnabled()) {
1661                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1662                    }
1663                }
1664                gcCandidateSet.remove(dataFileId);
1665                if (LOG.isTraceEnabled()) {
1666                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet);
1667                }
1668            }
1669
1670            if (metadata.ackMessageFileMapLocation != null) {
1671                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1672                gcCandidateSet.remove(dataFileId);
1673                if (LOG.isTraceEnabled()) {
1674                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet);
1675                }
1676            }
1677
1678            // Don't GC files referenced by in-progress tx
1679            if (inProgressTxRange[0] != null) {
1680                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1681                    gcCandidateSet.remove(pendingTx);
1682                }
1683            }
1684            if (LOG.isTraceEnabled()) {
1685                LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1686            }
1687
1688            // Go through all the destinations to see if any of them can remove GC candidates.
1689            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1690                if( gcCandidateSet.isEmpty() ) {
1691                    break;
1692                }
1693
1694                // Use a visitor to cut down the number of pages that we load
1695                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1696                    int last=-1;
1697                    @Override
1698                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1699                        if( first==null ) {
1700                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1701                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1702                                subset.remove(second.getDataFileId());
1703                            }
1704                            return !subset.isEmpty();
1705                        } else if( second==null ) {
1706                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1707                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1708                                subset.remove(first.getDataFileId());
1709                            }
1710                            return !subset.isEmpty();
1711                        } else {
1712                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1713                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1714                                subset.remove(first.getDataFileId());
1715                            }
1716                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1717                                subset.remove(second.getDataFileId());
1718                            }
1719                            return !subset.isEmpty();
1720                        }
1721                    }
1722
1723                    @Override
1724                    public void visit(List<Location> keys, List<Long> values) {
1725                        for (Location l : keys) {
1726                            int fileId = l.getDataFileId();
1727                            if( last != fileId ) {
1728                                gcCandidateSet.remove(fileId);
1729                                last = fileId;
1730                            }
1731                        }
1732                    }
1733                });
1734
1735                // Durable Subscription
1736                if (entry.getValue().subLocations != null) {
1737                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1738                    while (iter.hasNext()) {
1739                        Entry<String, Location> subscription = iter.next();
1740                        int dataFileId = subscription.getValue().getDataFileId();
1741
1742                        // Move subscription along if it has no outstanding messages that need ack'd
1743                        // and its in the last log file in the journal.
1744                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1745                            final StoredDestination destination = entry.getValue();
1746                            final String subscriptionKey = subscription.getKey();
1747                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1748
1749                            // When pending is size one that is the next message Id meaning there
1750                            // are no pending messages currently.
1751                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1752                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1753
1754                                if (LOG.isTraceEnabled()) {
1755                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
1756                                }
1757
1758                                final KahaSubscriptionCommand kahaSub =
1759                                    destination.subscriptions.get(tx, subscriptionKey);
1760                                destination.subLocations.put(
1761                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1762
1763                                // Skips the remove from candidates if we rewrote the subscription
1764                                // in order to prevent duplicate subscription commands on recover.
1765                                // If another subscription is on the same file and isn't rewritten
1766                                // than it will remove the file from the set.
1767                                continue;
1768                            }
1769                        }
1770
1771                        gcCandidateSet.remove(dataFileId);
1772                    }
1773                }
1774
1775                if (LOG.isTraceEnabled()) {
1776                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1777                }
1778            }
1779
1780            // check we are not deleting file with ack for in-use journal files
1781            if (LOG.isTraceEnabled()) {
1782                LOG.trace("gc candidates: " + gcCandidateSet);
1783                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1784            }
1785
1786            boolean ackMessageFileMapMod = false;
1787            Iterator<Integer> candidates = gcCandidateSet.iterator();
1788            while (candidates.hasNext()) {
1789                Integer candidate = candidates.next();
1790                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1791                if (referencedFileIds != null) {
1792                    for (Integer referencedFileId : referencedFileIds) {
1793                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1794                            // active file that is not targeted for deletion is referenced so don't delete
1795                            candidates.remove();
1796                            break;
1797                        }
1798                    }
1799                    if (gcCandidateSet.contains(candidate)) {
1800                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1801                    } else {
1802                        if (LOG.isTraceEnabled()) {
1803                            LOG.trace("not removing data file: " + candidate
1804                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1805                        }
1806                    }
1807                }
1808            }
1809
1810            if (!gcCandidateSet.isEmpty()) {
1811                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
1812                journal.removeDataFiles(gcCandidateSet);
1813                for (Integer candidate : gcCandidateSet) {
1814                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1815                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1816                    }
1817                }
1818                if (ackMessageFileMapMod) {
1819                    checkpointUpdate(tx, false);
1820                }
1821            } else if (isEnableAckCompaction()) {
1822                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
1823                    // First check length of journal to make sure it makes sense to even try.
1824                    //
1825                    // If there is only one journal file with Acks in it we don't need to move
1826                    // it since it won't be chained to any later logs.
1827                    //
1828                    // If the logs haven't grown since the last time then we need to compact
1829                    // otherwise there seems to still be room for growth and we don't need to incur
1830                    // the overhead.  Depending on configuration this check can be avoided and
1831                    // Ack compaction will run any time the store has not GC'd a journal file in
1832                    // the configured amount of cycles.
1833                    if (metadata.ackMessageFileMap.size() > 1 &&
1834                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
1835
1836                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
1837                        try {
1838                            scheduler.execute(new AckCompactionRunner());
1839                        } catch (Exception ex) {
1840                            LOG.warn("Error on queueing the Ack Compactor", ex);
1841                        }
1842                    } else {
1843                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
1844                    }
1845
1846                    checkPointCyclesWithNoGC = 0;
1847                } else {
1848                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
1849                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
1850                }
1851
1852                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
1853            }
1854        }
1855
1856        LOG.debug("Checkpoint done.");
1857    }
1858
1859    private final class AckCompactionRunner implements Runnable {
1860
1861        @Override
1862        public void run() {
1863            // Lock index to capture the ackMessageFileMap data
1864            indexLock.writeLock().lock();
1865
1866            // Map keys might not be sorted, find the earliest log file to forward acks
1867            // from and move only those, future cycles can chip away at more as needed.
1868            // We won't move files that are themselves rewritten on a previous compaction.
1869            List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
1870            Collections.sort(journalFileIds);
1871            int journalToAdvance = -1;
1872            for (Integer journalFileId : journalFileIds) {
1873                DataFile current = journal.getDataFileById(journalFileId);
1874                if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
1875                    journalToAdvance = journalFileId;
1876                    break;
1877                }
1878            }
1879
1880            // Check if we found one, or if we only found the current file being written to.
1881            if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
1882                return;
1883            }
1884
1885            Set<Integer> journalLogsReferenced =
1886                new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance));
1887
1888            indexLock.writeLock().unlock();
1889
1890            try {
1891                // Background rewrite of the old acks
1892                forwardAllAcks(journalToAdvance, journalLogsReferenced);
1893
1894                // Checkpoint with changes from the ackMessageFileMap
1895                checkpointUpdate(false);
1896            } catch (IOException ioe) {
1897                LOG.error("Checkpoint failed", ioe);
1898                brokerService.handleIOException(ioe);
1899            } catch (Throwable e) {
1900                LOG.error("Checkpoint failed", e);
1901                brokerService.handleIOException(IOExceptionSupport.create(e));
1902            }
1903        }
1904    }
1905
1906    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
1907        LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
1908
1909        DataFile forwardsFile = journal.reserveDataFile();
1910        LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
1911
1912        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
1913
1914        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
1915            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
1916            compactionMarker.setSourceDataFileId(journalToRead);
1917            compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE);
1918
1919            ByteSequence payload = toByteSequence(compactionMarker);
1920            appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
1921            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
1922
1923            Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0));
1924            while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) {
1925                JournalCommand<?> command = null;
1926                try {
1927                    command = load(nextLocation);
1928                } catch (IOException ex) {
1929                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
1930                }
1931
1932                if (command != null && command instanceof KahaRemoveMessageCommand) {
1933                    payload = toByteSequence(command);
1934                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
1935                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
1936                }
1937
1938                nextLocation = journal.getNextLocation(nextLocation);
1939            }
1940        }
1941
1942        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
1943
1944        // Lock index while we update the ackMessageFileMap.
1945        indexLock.writeLock().lock();
1946
1947        // Update the ack map with the new locations of the acks
1948        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
1949            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
1950            if (referenceFileIds == null) {
1951                referenceFileIds = new HashSet<Integer>();
1952                referenceFileIds.addAll(entry.getValue());
1953                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
1954            } else {
1955                referenceFileIds.addAll(entry.getValue());
1956            }
1957        }
1958
1959        // remove the old location data from the ack map so that the old journal log file can
1960        // be removed on next GC.
1961        metadata.ackMessageFileMap.remove(journalToRead);
1962
1963        indexLock.writeLock().unlock();
1964
1965        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
1966    }
1967
1968    final Runnable nullCompletionCallback = new Runnable() {
1969        @Override
1970        public void run() {
1971        }
1972    };
1973
1974    private Location checkpointProducerAudit() throws IOException {
1975        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
1976            ByteArrayOutputStream baos = new ByteArrayOutputStream();
1977            ObjectOutputStream oout = new ObjectOutputStream(baos);
1978            oout.writeObject(metadata.producerSequenceIdTracker);
1979            oout.flush();
1980            oout.close();
1981            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1982            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
1983            try {
1984                location.getLatch().await();
1985            } catch (InterruptedException e) {
1986                throw new InterruptedIOException(e.toString());
1987            }
1988            return location;
1989        }
1990        return metadata.producerSequenceIdTrackerLocation;
1991    }
1992
1993    private Location checkpointAckMessageFileMap() throws IOException {
1994        ByteArrayOutputStream baos = new ByteArrayOutputStream();
1995        ObjectOutputStream oout = new ObjectOutputStream(baos);
1996        oout.writeObject(metadata.ackMessageFileMap);
1997        oout.flush();
1998        oout.close();
1999        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2000        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
2001        try {
2002            location.getLatch().await();
2003        } catch (InterruptedException e) {
2004            throw new InterruptedIOException(e.toString());
2005        }
2006        return location;
2007    }
2008
2009    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
2010
2011        ByteSequence sequence = toByteSequence(subscription);
2012        Location location = journal.write(sequence, nullCompletionCallback) ;
2013
2014        try {
2015            location.getLatch().await();
2016        } catch (InterruptedException e) {
2017            throw new InterruptedIOException(e.toString());
2018        }
2019        return location;
2020    }
2021
2022    public HashSet<Integer> getJournalFilesBeingReplicated() {
2023        return journalFilesBeingReplicated;
2024    }
2025
2026    // /////////////////////////////////////////////////////////////////
2027    // StoredDestination related implementation methods.
2028    // /////////////////////////////////////////////////////////////////
2029
2030    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
2031
2032    static class MessageKeys {
2033        final String messageId;
2034        final Location location;
2035
2036        public MessageKeys(String messageId, Location location) {
2037            this.messageId=messageId;
2038            this.location=location;
2039        }
2040
2041        @Override
2042        public String toString() {
2043            return "["+messageId+","+location+"]";
2044        }
2045    }
2046
2047    protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
2048        final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller();
2049
2050        @Override
2051        public MessageKeys readPayload(DataInput dataIn) throws IOException {
2052            return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn));
2053        }
2054
2055        @Override
2056        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
2057            dataOut.writeUTF(object.messageId);
2058            locationSizeMarshaller.writePayload(object.location, dataOut);
2059        }
2060    }
2061
2062    class LastAck {
2063        long lastAckedSequence;
2064        byte priority;
2065
2066        public LastAck(LastAck source) {
2067            this.lastAckedSequence = source.lastAckedSequence;
2068            this.priority = source.priority;
2069        }
2070
2071        public LastAck() {
2072            this.priority = MessageOrderIndex.HI;
2073        }
2074
2075        public LastAck(long ackLocation) {
2076            this.lastAckedSequence = ackLocation;
2077            this.priority = MessageOrderIndex.LO;
2078        }
2079
2080        public LastAck(long ackLocation, byte priority) {
2081            this.lastAckedSequence = ackLocation;
2082            this.priority = priority;
2083        }
2084
2085        @Override
2086        public String toString() {
2087            return "[" + lastAckedSequence + ":" + priority + "]";
2088        }
2089    }
2090
2091    protected class LastAckMarshaller implements Marshaller<LastAck> {
2092
2093        @Override
2094        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
2095            dataOut.writeLong(object.lastAckedSequence);
2096            dataOut.writeByte(object.priority);
2097        }
2098
2099        @Override
2100        public LastAck readPayload(DataInput dataIn) throws IOException {
2101            LastAck lastAcked = new LastAck();
2102            lastAcked.lastAckedSequence = dataIn.readLong();
2103            if (metadata.version >= 3) {
2104                lastAcked.priority = dataIn.readByte();
2105            }
2106            return lastAcked;
2107        }
2108
2109        @Override
2110        public int getFixedSize() {
2111            return 9;
2112        }
2113
2114        @Override
2115        public LastAck deepCopy(LastAck source) {
2116            return new LastAck(source);
2117        }
2118
2119        @Override
2120        public boolean isDeepCopySupported() {
2121            return true;
2122        }
2123    }
2124
2125    class StoredDestination {
2126
2127        MessageOrderIndex orderIndex = new MessageOrderIndex();
2128        BTreeIndex<Location, Long> locationIndex;
2129        BTreeIndex<String, Long> messageIdIndex;
2130
2131        // These bits are only set for Topics
2132        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
2133        BTreeIndex<String, LastAck> subscriptionAcks;
2134        HashMap<String, MessageOrderCursor> subscriptionCursors;
2135        ListIndex<String, SequenceSet> ackPositions;
2136        ListIndex<String, Location> subLocations;
2137
2138        // Transient data used to track which Messages are no longer needed.
2139        final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
2140        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
2141
2142        public void trackPendingAdd(Long seq) {
2143            orderIndex.trackPendingAdd(seq);
2144        }
2145
2146        public void trackPendingAddComplete(Long seq) {
2147            orderIndex.trackPendingAddComplete(seq);
2148        }
2149
2150        @Override
2151        public String toString() {
2152            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
2153        }
2154    }
2155
2156    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
2157
2158        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
2159
2160        @Override
2161        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
2162            final StoredDestination value = new StoredDestination();
2163            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2164            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
2165            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
2166
2167            if (dataIn.readBoolean()) {
2168                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
2169                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
2170                if (metadata.version >= 4) {
2171                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
2172                } else {
2173                    // upgrade
2174                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2175                        @Override
2176                        public void execute(Transaction tx) throws IOException {
2177                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
2178
2179                            if (metadata.version >= 3) {
2180                                // migrate
2181                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2182                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
2183                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2184                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2185                                oldAckPositions.load(tx);
2186
2187
2188                                // Do the initial build of the data in memory before writing into the store
2189                                // based Ack Positions List to avoid a lot of disk thrashing.
2190                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2191                                while (iterator.hasNext()) {
2192                                    Entry<Long, HashSet<String>> entry = iterator.next();
2193
2194                                    for(String subKey : entry.getValue()) {
2195                                        SequenceSet pendingAcks = temp.get(subKey);
2196                                        if (pendingAcks == null) {
2197                                            pendingAcks = new SequenceSet();
2198                                            temp.put(subKey, pendingAcks);
2199                                        }
2200
2201                                        pendingAcks.add(entry.getKey());
2202                                    }
2203                                }
2204                            }
2205                            // Now move the pending messages to ack data into the store backed
2206                            // structure.
2207                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2208                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2209                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2210                            value.ackPositions.load(tx);
2211                            for(String subscriptionKey : temp.keySet()) {
2212                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2213                            }
2214
2215                        }
2216                    });
2217                }
2218
2219                if (metadata.version >= 5) {
2220                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
2221                } else {
2222                    // upgrade
2223                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2224                        @Override
2225                        public void execute(Transaction tx) throws IOException {
2226                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2227                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2228                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2229                            value.subLocations.load(tx);
2230                        }
2231                    });
2232                }
2233            }
2234            if (metadata.version >= 2) {
2235                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2236                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2237            } else {
2238                // upgrade
2239                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2240                    @Override
2241                    public void execute(Transaction tx) throws IOException {
2242                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2243                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2244                        value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2245                        value.orderIndex.lowPriorityIndex.load(tx);
2246
2247                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2248                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2249                        value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2250                        value.orderIndex.highPriorityIndex.load(tx);
2251                    }
2252                });
2253            }
2254
2255            return value;
2256        }
2257
2258        @Override
2259        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2260            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2261            dataOut.writeLong(value.locationIndex.getPageId());
2262            dataOut.writeLong(value.messageIdIndex.getPageId());
2263            if (value.subscriptions != null) {
2264                dataOut.writeBoolean(true);
2265                dataOut.writeLong(value.subscriptions.getPageId());
2266                dataOut.writeLong(value.subscriptionAcks.getPageId());
2267                dataOut.writeLong(value.ackPositions.getHeadPageId());
2268                dataOut.writeLong(value.subLocations.getHeadPageId());
2269            } else {
2270                dataOut.writeBoolean(false);
2271            }
2272            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2273            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2274        }
2275    }
2276
2277    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2278        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2279
2280        @Override
2281        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2282            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2283            rc.mergeFramed((InputStream)dataIn);
2284            return rc;
2285        }
2286
2287        @Override
2288        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2289            object.writeFramed((OutputStream)dataOut);
2290        }
2291    }
2292
2293    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2294        String key = key(destination);
2295        StoredDestination rc = storedDestinations.get(key);
2296        if (rc == null) {
2297            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2298            rc = loadStoredDestination(tx, key, topic);
2299            // Cache it. We may want to remove/unload destinations from the
2300            // cache that are not used for a while
2301            // to reduce memory usage.
2302            storedDestinations.put(key, rc);
2303        }
2304        return rc;
2305    }
2306
2307    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2308        String key = key(destination);
2309        StoredDestination rc = storedDestinations.get(key);
2310        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2311            rc = getStoredDestination(destination, tx);
2312        }
2313        return rc;
2314    }
2315
2316    /**
2317     * @param tx
2318     * @param key
2319     * @param topic
2320     * @return
2321     * @throws IOException
2322     */
2323    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2324        // Try to load the existing indexes..
2325        StoredDestination rc = metadata.destinations.get(tx, key);
2326        if (rc == null) {
2327            // Brand new destination.. allocate indexes for it.
2328            rc = new StoredDestination();
2329            rc.orderIndex.allocate(tx);
2330            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
2331            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
2332
2333            if (topic) {
2334                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
2335                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
2336                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2337                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2338            }
2339            metadata.destinations.put(tx, key, rc);
2340        }
2341
2342        // Configure the marshalers and load.
2343        rc.orderIndex.load(tx);
2344
2345        // Figure out the next key using the last entry in the destination.
2346        rc.orderIndex.configureLast(tx);
2347
2348        rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
2349        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2350        rc.locationIndex.load(tx);
2351
2352        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2353        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2354        rc.messageIdIndex.load(tx);
2355
2356        //go through an upgrade old index if older than version 6
2357        if (metadata.version < 6) {
2358            for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) {
2359                Entry<Location, Long> entry = iterator.next();
2360                // modify so it is upgraded
2361                rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
2362            }
2363            //upgrade the order index
2364            for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) {
2365                Entry<Long, MessageKeys> entry = iterator.next();
2366                //call get so that the last priority is updated
2367                rc.orderIndex.get(tx, entry.getKey());
2368                rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue());
2369            }
2370        }
2371
2372        // If it was a topic...
2373        if (topic) {
2374
2375            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2376            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2377            rc.subscriptions.load(tx);
2378
2379            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2380            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2381            rc.subscriptionAcks.load(tx);
2382
2383            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2384            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2385            rc.ackPositions.load(tx);
2386
2387            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2388            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2389            rc.subLocations.load(tx);
2390
2391            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
2392
2393            if (metadata.version < 3) {
2394
2395                // on upgrade need to fill ackLocation with available messages past last ack
2396                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2397                    Entry<String, LastAck> entry = iterator.next();
2398                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2399                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2400                        Long sequence = orderIterator.next().getKey();
2401                        addAckLocation(tx, rc, sequence, entry.getKey());
2402                    }
2403                    // modify so it is upgraded
2404                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2405                }
2406            }
2407
2408            // Configure the message references index
2409            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2410            while (subscriptions.hasNext()) {
2411                Entry<String, SequenceSet> subscription = subscriptions.next();
2412                SequenceSet pendingAcks = subscription.getValue();
2413                if (pendingAcks != null && !pendingAcks.isEmpty()) {
2414                    Long lastPendingAck = pendingAcks.getTail().getLast();
2415                    for (Long sequenceId : pendingAcks) {
2416                        Long current = rc.messageReferences.get(sequenceId);
2417                        if (current == null) {
2418                            current = new Long(0);
2419                        }
2420
2421                        // We always add a trailing empty entry for the next position to start from
2422                        // so we need to ensure we don't count that as a message reference on reload.
2423                        if (!sequenceId.equals(lastPendingAck)) {
2424                            current = current.longValue() + 1;
2425                        } else {
2426                            current = Long.valueOf(0L);
2427                        }
2428
2429                        rc.messageReferences.put(sequenceId, current);
2430                    }
2431                }
2432            }
2433
2434            // Configure the subscription cache
2435            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2436                Entry<String, LastAck> entry = iterator.next();
2437                rc.subscriptionCache.add(entry.getKey());
2438            }
2439
2440            if (rc.orderIndex.nextMessageId == 0) {
2441                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2442                if (!rc.subscriptionAcks.isEmpty(tx)) {
2443                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2444                        Entry<String, LastAck> entry = iterator.next();
2445                        rc.orderIndex.nextMessageId =
2446                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2447                    }
2448                }
2449            } else {
2450                // update based on ackPositions for unmatched, last entry is always the next
2451                if (!rc.messageReferences.isEmpty()) {
2452                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
2453                    rc.orderIndex.nextMessageId =
2454                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
2455                }
2456            }
2457        }
2458
2459        if (metadata.version < VERSION) {
2460            // store again after upgrade
2461            metadata.destinations.put(tx, key, rc);
2462        }
2463        return rc;
2464    }
2465
2466    /**
2467     * Clear the counter for the destination, if one exists.
2468     *
2469     * @param kahaDestination
2470     */
2471    protected void clearStoreStats(KahaDestination kahaDestination) {
2472        MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination));
2473        if (storeStats != null) {
2474            storeStats.reset();
2475        }
2476    }
2477
2478    /**
2479     * Update MessageStoreStatistics
2480     *
2481     * @param kahaDestination
2482     * @param size
2483     */
2484    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
2485        incrementAndAddSizeToStoreStat(key(kahaDestination), size);
2486    }
2487
2488    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
2489        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2490        if (storeStats != null) {
2491            storeStats.getMessageCount().increment();
2492            if (size > 0) {
2493                storeStats.getMessageSize().addSize(size);
2494            }
2495        }
2496    }
2497
2498    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
2499        decrementAndSubSizeToStoreStat(key(kahaDestination), size);
2500    }
2501
2502    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
2503        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2504        if (storeStats != null) {
2505            storeStats.getMessageCount().decrement();
2506            if (size > 0) {
2507                storeStats.getMessageSize().addSize(-size);
2508            }
2509        }
2510    }
2511
2512    /**
2513     * This is a map to cache DestinationStatistics for a specific
2514     * KahaDestination key
2515     */
2516    protected final ConcurrentMap<String, MessageStore> storeCache =
2517            new ConcurrentHashMap<String, MessageStore>();
2518
2519    /**
2520     * Locate the storeMessageSize counter for this KahaDestination
2521     * @param kahaDestination
2522     * @return
2523     */
2524    protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
2525        MessageStoreStatistics storeStats = null;
2526        try {
2527            MessageStore messageStore = storeCache.get(kahaDestKey);
2528            if (messageStore != null) {
2529                storeStats = messageStore.getMessageStoreStatistics();
2530            }
2531        } catch (Exception e1) {
2532             LOG.error("Getting size counter of destination failed", e1);
2533        }
2534
2535        return storeStats;
2536    }
2537
2538    /**
2539     * Determine whether this Destination matches the DestinationType
2540     *
2541     * @param destination
2542     * @param type
2543     * @return
2544     */
2545    protected boolean matchType(Destination destination,
2546            KahaDestination.DestinationType type) {
2547        if (destination instanceof Topic
2548                && type.equals(KahaDestination.DestinationType.TOPIC)) {
2549            return true;
2550        } else if (destination instanceof Queue
2551                && type.equals(KahaDestination.DestinationType.QUEUE)) {
2552            return true;
2553        }
2554        return false;
2555    }
2556
2557    class LocationSizeMarshaller implements Marshaller<Location> {
2558
2559        public LocationSizeMarshaller() {
2560
2561        }
2562
2563        @Override
2564        public Location readPayload(DataInput dataIn) throws IOException {
2565            Location rc = new Location();
2566            rc.setDataFileId(dataIn.readInt());
2567            rc.setOffset(dataIn.readInt());
2568            if (metadata.version >= 6) {
2569                rc.setSize(dataIn.readInt());
2570            }
2571            return rc;
2572        }
2573
2574        @Override
2575        public void writePayload(Location object, DataOutput dataOut)
2576                throws IOException {
2577            dataOut.writeInt(object.getDataFileId());
2578            dataOut.writeInt(object.getOffset());
2579            dataOut.writeInt(object.getSize());
2580        }
2581
2582        @Override
2583        public int getFixedSize() {
2584            return 12;
2585        }
2586
2587        @Override
2588        public Location deepCopy(Location source) {
2589            return new Location(source);
2590        }
2591
2592        @Override
2593        public boolean isDeepCopySupported() {
2594            return true;
2595        }
2596    }
2597
2598    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2599        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2600        if (sequences == null) {
2601            sequences = new SequenceSet();
2602            sequences.add(messageSequence);
2603            sd.ackPositions.add(tx, subscriptionKey, sequences);
2604        } else {
2605            sequences.add(messageSequence);
2606            sd.ackPositions.put(tx, subscriptionKey, sequences);
2607        }
2608
2609        Long count = sd.messageReferences.get(messageSequence);
2610        if (count == null) {
2611            count = Long.valueOf(0L);
2612        }
2613        count = count.longValue() + 1;
2614        sd.messageReferences.put(messageSequence, count);
2615    }
2616
2617    // new sub is interested in potentially all existing messages
2618    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2619        SequenceSet allOutstanding = new SequenceSet();
2620        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2621        while (iterator.hasNext()) {
2622            SequenceSet set = iterator.next().getValue();
2623            for (Long entry : set) {
2624                allOutstanding.add(entry);
2625            }
2626        }
2627        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2628
2629        for (Long ackPosition : allOutstanding) {
2630            Long count = sd.messageReferences.get(ackPosition);
2631
2632            // There might not be a reference if the ackLocation was the last
2633            // one which is a placeholder for the next incoming message and
2634            // no value was added to the message references table.
2635            if (count != null) {
2636                count = count.longValue() + 1;
2637                sd.messageReferences.put(ackPosition, count);
2638            }
2639        }
2640    }
2641
2642    // on a new message add, all existing subs are interested in this message
2643    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
2644        for(String subscriptionKey : sd.subscriptionCache) {
2645            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2646            if (sequences == null) {
2647                sequences = new SequenceSet();
2648                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2649                sd.ackPositions.add(tx, subscriptionKey, sequences);
2650            } else {
2651                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2652                sd.ackPositions.put(tx, subscriptionKey, sequences);
2653            }
2654
2655            Long count = sd.messageReferences.get(messageSequence);
2656            if (count == null) {
2657                count = Long.valueOf(0L);
2658            }
2659            count = count.longValue() + 1;
2660            sd.messageReferences.put(messageSequence, count);
2661            sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
2662        }
2663    }
2664
2665    private void removeAckLocationsForSub(KahaSubscriptionCommand command,
2666            Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2667        if (!sd.ackPositions.isEmpty(tx)) {
2668            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2669            if (sequences == null || sequences.isEmpty()) {
2670                return;
2671            }
2672
2673            ArrayList<Long> unreferenced = new ArrayList<Long>();
2674
2675            for(Long sequenceId : sequences) {
2676                Long references = sd.messageReferences.get(sequenceId);
2677                if (references != null) {
2678                    references = references.longValue() - 1;
2679
2680                    if (references.longValue() > 0) {
2681                        sd.messageReferences.put(sequenceId, references);
2682                    } else {
2683                        sd.messageReferences.remove(sequenceId);
2684                        unreferenced.add(sequenceId);
2685                    }
2686                }
2687            }
2688
2689            for(Long sequenceId : unreferenced) {
2690                // Find all the entries that need to get deleted.
2691                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2692                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2693
2694                // Do the actual deletes.
2695                for (Entry<Long, MessageKeys> entry : deletes) {
2696                    sd.locationIndex.remove(tx, entry.getValue().location);
2697                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2698                    sd.orderIndex.remove(tx, entry.getKey());
2699                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2700                }
2701            }
2702        }
2703    }
2704
2705    /**
2706     * @param tx
2707     * @param sd
2708     * @param subscriptionKey
2709     * @param messageSequence
2710     * @throws IOException
2711     */
2712    private void removeAckLocation(KahaRemoveMessageCommand command,
2713            Transaction tx, StoredDestination sd, String subscriptionKey,
2714            Long messageSequence) throws IOException {
2715        // Remove the sub from the previous location set..
2716        if (messageSequence != null) {
2717            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2718            if (range != null && !range.isEmpty()) {
2719                range.remove(messageSequence);
2720                if (!range.isEmpty()) {
2721                    sd.ackPositions.put(tx, subscriptionKey, range);
2722                } else {
2723                    sd.ackPositions.remove(tx, subscriptionKey);
2724                }
2725
2726                // Check if the message is reference by any other subscription.
2727                Long count = sd.messageReferences.get(messageSequence);
2728                if (count != null) {
2729                    long references = count.longValue() - 1;
2730                    if (references > 0) {
2731                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
2732                        return;
2733                    } else {
2734                        sd.messageReferences.remove(messageSequence);
2735                    }
2736                }
2737
2738                // Find all the entries that need to get deleted.
2739                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2740                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2741
2742                // Do the actual deletes.
2743                for (Entry<Long, MessageKeys> entry : deletes) {
2744                    sd.locationIndex.remove(tx, entry.getValue().location);
2745                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2746                    sd.orderIndex.remove(tx, entry.getKey());
2747                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2748                }
2749            }
2750        }
2751    }
2752
2753    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2754        return sd.subscriptionAcks.get(tx, subscriptionKey);
2755    }
2756
2757    public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2758        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2759        if (messageSequences != null) {
2760            long result = messageSequences.rangeSize();
2761            // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2762            return result > 0 ? result - 1 : 0;
2763        }
2764
2765        return 0;
2766    }
2767
2768    public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2769        //grab the messages attached to this subscription
2770        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2771
2772        long locationSize = 0;
2773        if (messageSequences != null) {
2774            Sequence head = messageSequences.getHead();
2775            if (head != null) {
2776                //get an iterator over the order index starting at the first unacked message
2777                //and go over each message to add up the size
2778                Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
2779                        new MessageOrderCursor(head.getFirst()));
2780
2781                while (iterator.hasNext()) {
2782                    Entry<Long, MessageKeys> entry = iterator.next();
2783                    locationSize += entry.getValue().location.getSize();
2784                }
2785            }
2786        }
2787
2788        return locationSize;
2789    }
2790
2791    protected String key(KahaDestination destination) {
2792        return destination.getType().getNumber() + ":" + destination.getName();
2793    }
2794
2795    // /////////////////////////////////////////////////////////////////
2796    // Transaction related implementation methods.
2797    // /////////////////////////////////////////////////////////////////
2798    @SuppressWarnings("rawtypes")
2799    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2800    @SuppressWarnings("rawtypes")
2801    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2802    protected final Set<String> ackedAndPrepared = new HashSet<String>();
2803    protected final Set<String> rolledBackAcks = new HashSet<String>();
2804
2805    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2806    // till then they are skipped by the store.
2807    // 'at most once' XA guarantee
2808    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2809        this.indexLock.writeLock().lock();
2810        try {
2811            for (MessageAck ack : acks) {
2812                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
2813            }
2814        } finally {
2815            this.indexLock.writeLock().unlock();
2816        }
2817    }
2818
2819    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
2820        if (acks != null) {
2821            this.indexLock.writeLock().lock();
2822            try {
2823                for (MessageAck ack : acks) {
2824                    final String id = ack.getLastMessageId().toProducerKey();
2825                    ackedAndPrepared.remove(id);
2826                    if (rollback) {
2827                        rolledBackAcks.add(id);
2828                    }
2829                }
2830            } finally {
2831                this.indexLock.writeLock().unlock();
2832            }
2833        }
2834    }
2835
2836    @SuppressWarnings("rawtypes")
2837    private List<Operation> getInflightTx(KahaTransactionInfo info) {
2838        TransactionId key = TransactionIdConversion.convert(info);
2839        List<Operation> tx;
2840        synchronized (inflightTransactions) {
2841            tx = inflightTransactions.get(key);
2842            if (tx == null) {
2843                tx = Collections.synchronizedList(new ArrayList<Operation>());
2844                inflightTransactions.put(key, tx);
2845            }
2846        }
2847        return tx;
2848    }
2849
2850    @SuppressWarnings("unused")
2851    private TransactionId key(KahaTransactionInfo transactionInfo) {
2852        return TransactionIdConversion.convert(transactionInfo);
2853    }
2854
2855    abstract class Operation <T extends JournalCommand<T>> {
2856        final T command;
2857        final Location location;
2858
2859        public Operation(T command, Location location) {
2860            this.command = command;
2861            this.location = location;
2862        }
2863
2864        public Location getLocation() {
2865            return location;
2866        }
2867
2868        public T getCommand() {
2869            return command;
2870        }
2871
2872        abstract public void execute(Transaction tx) throws IOException;
2873    }
2874
2875    class AddOperation extends Operation<KahaAddMessageCommand> {
2876        final IndexAware runWithIndexLock;
2877        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
2878            super(command, location);
2879            this.runWithIndexLock = runWithIndexLock;
2880        }
2881
2882        @Override
2883        public void execute(Transaction tx) throws IOException {
2884            long seq = updateIndex(tx, command, location);
2885            if (runWithIndexLock != null) {
2886                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
2887            }
2888        }
2889    }
2890
2891    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
2892
2893        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
2894            super(command, location);
2895        }
2896
2897        @Override
2898        public void execute(Transaction tx) throws IOException {
2899            updateIndex(tx, command, location);
2900        }
2901    }
2902
2903    // /////////////////////////////////////////////////////////////////
2904    // Initialization related implementation methods.
2905    // /////////////////////////////////////////////////////////////////
2906
2907    private PageFile createPageFile() throws IOException {
2908        if (indexDirectory == null) {
2909            indexDirectory = directory;
2910        }
2911        IOHelper.mkdirs(indexDirectory);
2912        PageFile index = new PageFile(indexDirectory, "db");
2913        index.setEnableWriteThread(isEnableIndexWriteAsync());
2914        index.setWriteBatchSize(getIndexWriteBatchSize());
2915        index.setPageCacheSize(indexCacheSize);
2916        index.setUseLFRUEviction(isUseIndexLFRUEviction());
2917        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2918        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2919        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2920        index.setEnablePageCaching(isEnableIndexPageCaching());
2921        return index;
2922    }
2923
2924    private Journal createJournal() throws IOException {
2925        Journal manager = new Journal();
2926        manager.setDirectory(directory);
2927        manager.setMaxFileLength(getJournalMaxFileLength());
2928        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2929        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2930        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2931        manager.setArchiveDataLogs(isArchiveDataLogs());
2932        manager.setSizeAccumulator(journalSize);
2933        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2934        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
2935        manager.setPreallocationStrategy(
2936                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
2937        if (getDirectoryArchive() != null) {
2938            IOHelper.mkdirs(getDirectoryArchive());
2939            manager.setDirectoryArchive(getDirectoryArchive());
2940        }
2941        return manager;
2942    }
2943
2944    private Metadata createMetadata() {
2945        Metadata md = new Metadata();
2946        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
2947        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
2948        return md;
2949    }
2950
2951    protected abstract void configureMetadata();
2952
2953    public int getJournalMaxWriteBatchSize() {
2954        return journalMaxWriteBatchSize;
2955    }
2956
2957    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2958        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2959    }
2960
2961    public File getDirectory() {
2962        return directory;
2963    }
2964
2965    public void setDirectory(File directory) {
2966        this.directory = directory;
2967    }
2968
2969    public boolean isDeleteAllMessages() {
2970        return deleteAllMessages;
2971    }
2972
2973    public void setDeleteAllMessages(boolean deleteAllMessages) {
2974        this.deleteAllMessages = deleteAllMessages;
2975    }
2976
2977    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2978        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2979    }
2980
2981    public int getIndexWriteBatchSize() {
2982        return setIndexWriteBatchSize;
2983    }
2984
2985    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2986        this.enableIndexWriteAsync = enableIndexWriteAsync;
2987    }
2988
2989    boolean isEnableIndexWriteAsync() {
2990        return enableIndexWriteAsync;
2991    }
2992
2993    public boolean isEnableJournalDiskSyncs() {
2994        return enableJournalDiskSyncs;
2995    }
2996
2997    public void setEnableJournalDiskSyncs(boolean syncWrites) {
2998        this.enableJournalDiskSyncs = syncWrites;
2999    }
3000
3001    public long getCheckpointInterval() {
3002        return checkpointInterval;
3003    }
3004
3005    public void setCheckpointInterval(long checkpointInterval) {
3006        this.checkpointInterval = checkpointInterval;
3007    }
3008
3009    public long getCleanupInterval() {
3010        return cleanupInterval;
3011    }
3012
3013    public void setCleanupInterval(long cleanupInterval) {
3014        this.cleanupInterval = cleanupInterval;
3015    }
3016
3017    public void setJournalMaxFileLength(int journalMaxFileLength) {
3018        this.journalMaxFileLength = journalMaxFileLength;
3019    }
3020
3021    public int getJournalMaxFileLength() {
3022        return journalMaxFileLength;
3023    }
3024
3025    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
3026        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
3027    }
3028
3029    public int getMaxFailoverProducersToTrack() {
3030        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
3031    }
3032
3033    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
3034        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
3035    }
3036
3037    public int getFailoverProducersAuditDepth() {
3038        return this.metadata.producerSequenceIdTracker.getAuditDepth();
3039    }
3040
3041    public PageFile getPageFile() throws IOException {
3042        if (pageFile == null) {
3043            pageFile = createPageFile();
3044        }
3045        return pageFile;
3046    }
3047
3048    public Journal getJournal() throws IOException {
3049        if (journal == null) {
3050            journal = createJournal();
3051        }
3052        return journal;
3053    }
3054
3055    public boolean isFailIfDatabaseIsLocked() {
3056        return failIfDatabaseIsLocked;
3057    }
3058
3059    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
3060        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
3061    }
3062
3063    public boolean isIgnoreMissingJournalfiles() {
3064        return ignoreMissingJournalfiles;
3065    }
3066
3067    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
3068        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
3069    }
3070
3071    public int getIndexCacheSize() {
3072        return indexCacheSize;
3073    }
3074
3075    public void setIndexCacheSize(int indexCacheSize) {
3076        this.indexCacheSize = indexCacheSize;
3077    }
3078
3079    public boolean isCheckForCorruptJournalFiles() {
3080        return checkForCorruptJournalFiles;
3081    }
3082
3083    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
3084        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
3085    }
3086
3087    public boolean isChecksumJournalFiles() {
3088        return checksumJournalFiles;
3089    }
3090
3091    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
3092        this.checksumJournalFiles = checksumJournalFiles;
3093    }
3094
3095    @Override
3096    public void setBrokerService(BrokerService brokerService) {
3097        this.brokerService = brokerService;
3098    }
3099
3100    /**
3101     * @return the archiveDataLogs
3102     */
3103    public boolean isArchiveDataLogs() {
3104        return this.archiveDataLogs;
3105    }
3106
3107    /**
3108     * @param archiveDataLogs the archiveDataLogs to set
3109     */
3110    public void setArchiveDataLogs(boolean archiveDataLogs) {
3111        this.archiveDataLogs = archiveDataLogs;
3112    }
3113
3114    /**
3115     * @return the directoryArchive
3116     */
3117    public File getDirectoryArchive() {
3118        return this.directoryArchive;
3119    }
3120
3121    /**
3122     * @param directoryArchive the directoryArchive to set
3123     */
3124    public void setDirectoryArchive(File directoryArchive) {
3125        this.directoryArchive = directoryArchive;
3126    }
3127
3128    public boolean isArchiveCorruptedIndex() {
3129        return archiveCorruptedIndex;
3130    }
3131
3132    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
3133        this.archiveCorruptedIndex = archiveCorruptedIndex;
3134    }
3135
3136    public float getIndexLFUEvictionFactor() {
3137        return indexLFUEvictionFactor;
3138    }
3139
3140    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
3141        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
3142    }
3143
3144    public boolean isUseIndexLFRUEviction() {
3145        return useIndexLFRUEviction;
3146    }
3147
3148    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
3149        this.useIndexLFRUEviction = useIndexLFRUEviction;
3150    }
3151
3152    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
3153        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
3154    }
3155
3156    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
3157        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
3158    }
3159
3160    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
3161        this.enableIndexPageCaching = enableIndexPageCaching;
3162    }
3163
3164    public boolean isEnableIndexDiskSyncs() {
3165        return enableIndexDiskSyncs;
3166    }
3167
3168    public boolean isEnableIndexRecoveryFile() {
3169        return enableIndexRecoveryFile;
3170    }
3171
3172    public boolean isEnableIndexPageCaching() {
3173        return enableIndexPageCaching;
3174    }
3175
3176    // /////////////////////////////////////////////////////////////////
3177    // Internal conversion methods.
3178    // /////////////////////////////////////////////////////////////////
3179
3180    class MessageOrderCursor{
3181        long defaultCursorPosition;
3182        long lowPriorityCursorPosition;
3183        long highPriorityCursorPosition;
3184        MessageOrderCursor(){
3185        }
3186
3187        MessageOrderCursor(long position){
3188            this.defaultCursorPosition=position;
3189            this.lowPriorityCursorPosition=position;
3190            this.highPriorityCursorPosition=position;
3191        }
3192
3193        MessageOrderCursor(MessageOrderCursor other){
3194            this.defaultCursorPosition=other.defaultCursorPosition;
3195            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3196            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3197        }
3198
3199        MessageOrderCursor copy() {
3200            return new MessageOrderCursor(this);
3201        }
3202
3203        void reset() {
3204            this.defaultCursorPosition=0;
3205            this.highPriorityCursorPosition=0;
3206            this.lowPriorityCursorPosition=0;
3207        }
3208
3209        void increment() {
3210            if (defaultCursorPosition!=0) {
3211                defaultCursorPosition++;
3212            }
3213            if (highPriorityCursorPosition!=0) {
3214                highPriorityCursorPosition++;
3215            }
3216            if (lowPriorityCursorPosition!=0) {
3217                lowPriorityCursorPosition++;
3218            }
3219        }
3220
3221        @Override
3222        public String toString() {
3223           return "MessageOrderCursor:[def:" + defaultCursorPosition
3224                   + ", low:" + lowPriorityCursorPosition
3225                   + ", high:" +  highPriorityCursorPosition + "]";
3226        }
3227
3228        public void sync(MessageOrderCursor other) {
3229            this.defaultCursorPosition=other.defaultCursorPosition;
3230            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3231            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3232        }
3233    }
3234
3235    class MessageOrderIndex {
3236        static final byte HI = 9;
3237        static final byte LO = 0;
3238        static final byte DEF = 4;
3239
3240        long nextMessageId;
3241        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3242        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3243        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3244        final MessageOrderCursor cursor = new MessageOrderCursor();
3245        Long lastDefaultKey;
3246        Long lastHighKey;
3247        Long lastLowKey;
3248        byte lastGetPriority;
3249        final List<Long> pendingAdditions = new LinkedList<Long>();
3250        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
3251
3252        MessageKeys remove(Transaction tx, Long key) throws IOException {
3253            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3254            if (result == null && highPriorityIndex!=null) {
3255                result = highPriorityIndex.remove(tx, key);
3256                if (result ==null && lowPriorityIndex!=null) {
3257                    result = lowPriorityIndex.remove(tx, key);
3258                }
3259            }
3260            return result;
3261        }
3262
3263        void load(Transaction tx) throws IOException {
3264            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3265            defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3266            defaultPriorityIndex.load(tx);
3267            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3268            lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3269            lowPriorityIndex.load(tx);
3270            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3271            highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3272            highPriorityIndex.load(tx);
3273        }
3274
3275        void allocate(Transaction tx) throws IOException {
3276            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3277            if (metadata.version >= 2) {
3278                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3279                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3280            }
3281        }
3282
3283        void configureLast(Transaction tx) throws IOException {
3284            // Figure out the next key using the last entry in the destination.
3285            TreeSet<Long> orderedSet = new TreeSet<Long>();
3286
3287            addLast(orderedSet, highPriorityIndex, tx);
3288            addLast(orderedSet, defaultPriorityIndex, tx);
3289            addLast(orderedSet, lowPriorityIndex, tx);
3290
3291            if (!orderedSet.isEmpty()) {
3292                nextMessageId = orderedSet.last() + 1;
3293            }
3294        }
3295
3296        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3297            if (index != null) {
3298                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3299                if (lastEntry != null) {
3300                    orderedSet.add(lastEntry.getKey());
3301                }
3302            }
3303        }
3304
3305        void clear(Transaction tx) throws IOException {
3306            this.remove(tx);
3307            this.resetCursorPosition();
3308            this.allocate(tx);
3309            this.load(tx);
3310            this.configureLast(tx);
3311        }
3312
3313        void remove(Transaction tx) throws IOException {
3314            defaultPriorityIndex.clear(tx);
3315            defaultPriorityIndex.unload(tx);
3316            tx.free(defaultPriorityIndex.getPageId());
3317            if (lowPriorityIndex != null) {
3318                lowPriorityIndex.clear(tx);
3319                lowPriorityIndex.unload(tx);
3320
3321                tx.free(lowPriorityIndex.getPageId());
3322            }
3323            if (highPriorityIndex != null) {
3324                highPriorityIndex.clear(tx);
3325                highPriorityIndex.unload(tx);
3326                tx.free(highPriorityIndex.getPageId());
3327            }
3328        }
3329
3330        void resetCursorPosition() {
3331            this.cursor.reset();
3332            lastDefaultKey = null;
3333            lastHighKey = null;
3334            lastLowKey = null;
3335        }
3336
3337        void setBatch(Transaction tx, Long sequence) throws IOException {
3338            if (sequence != null) {
3339                Long nextPosition = new Long(sequence.longValue() + 1);
3340                lastDefaultKey = sequence;
3341                cursor.defaultCursorPosition = nextPosition.longValue();
3342                lastHighKey = sequence;
3343                cursor.highPriorityCursorPosition = nextPosition.longValue();
3344                lastLowKey = sequence;
3345                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3346            }
3347        }
3348
3349        void setBatch(Transaction tx, LastAck last) throws IOException {
3350            setBatch(tx, last.lastAckedSequence);
3351            if (cursor.defaultCursorPosition == 0
3352                    && cursor.highPriorityCursorPosition == 0
3353                    && cursor.lowPriorityCursorPosition == 0) {
3354                long next = last.lastAckedSequence + 1;
3355                switch (last.priority) {
3356                    case DEF:
3357                        cursor.defaultCursorPosition = next;
3358                        cursor.highPriorityCursorPosition = next;
3359                        break;
3360                    case HI:
3361                        cursor.highPriorityCursorPosition = next;
3362                        break;
3363                    case LO:
3364                        cursor.lowPriorityCursorPosition = next;
3365                        cursor.defaultCursorPosition = next;
3366                        cursor.highPriorityCursorPosition = next;
3367                        break;
3368                }
3369            }
3370        }
3371
3372        void stoppedIterating() {
3373            if (lastDefaultKey!=null) {
3374                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3375            }
3376            if (lastHighKey!=null) {
3377                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3378            }
3379            if (lastLowKey!=null) {
3380                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3381            }
3382            lastDefaultKey = null;
3383            lastHighKey = null;
3384            lastLowKey = null;
3385        }
3386
3387        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3388                throws IOException {
3389            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3390                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3391            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3392                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3393            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3394                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3395            }
3396        }
3397
3398        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3399                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3400
3401            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3402            deletes.add(iterator.next());
3403        }
3404
3405        long getNextMessageId() {
3406            return nextMessageId++;
3407        }
3408
3409        void revertNextMessageId() {
3410            nextMessageId--;
3411        }
3412
3413        MessageKeys get(Transaction tx, Long key) throws IOException {
3414            MessageKeys result = defaultPriorityIndex.get(tx, key);
3415            if (result == null) {
3416                result = highPriorityIndex.get(tx, key);
3417                if (result == null) {
3418                    result = lowPriorityIndex.get(tx, key);
3419                    lastGetPriority = LO;
3420                } else {
3421                    lastGetPriority = HI;
3422                }
3423            } else {
3424                lastGetPriority = DEF;
3425            }
3426            return result;
3427        }
3428
3429        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3430            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3431                return defaultPriorityIndex.put(tx, key, value);
3432            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3433                return highPriorityIndex.put(tx, key, value);
3434            } else {
3435                return lowPriorityIndex.put(tx, key, value);
3436            }
3437        }
3438
3439        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3440            return new MessageOrderIterator(tx,cursor,this);
3441        }
3442
3443        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3444            return new MessageOrderIterator(tx,m,this);
3445        }
3446
3447        public byte lastGetPriority() {
3448            return lastGetPriority;
3449        }
3450
3451        public boolean alreadyDispatched(Long sequence) {
3452            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3453                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3454                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3455        }
3456
3457        public void trackPendingAdd(Long seq) {
3458            synchronized (pendingAdditions) {
3459                pendingAdditions.add(seq);
3460            }
3461        }
3462
3463        public void trackPendingAddComplete(Long seq) {
3464            synchronized (pendingAdditions) {
3465                pendingAdditions.remove(seq);
3466            }
3467        }
3468
3469        public Long minPendingAdd() {
3470            synchronized (pendingAdditions) {
3471                if (!pendingAdditions.isEmpty()) {
3472                    return pendingAdditions.get(0);
3473                } else {
3474                    return null;
3475                }
3476            }
3477        }
3478
3479        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3480            Iterator<Entry<Long, MessageKeys>>currentIterator;
3481            final Iterator<Entry<Long, MessageKeys>>highIterator;
3482            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3483            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3484
3485            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3486                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3487                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3488                if (highPriorityIndex != null) {
3489                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3490                } else {
3491                    this.highIterator = null;
3492                }
3493                if (lowPriorityIndex != null) {
3494                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3495                } else {
3496                    this.lowIterator = null;
3497                }
3498            }
3499
3500            @Override
3501            public boolean hasNext() {
3502                if (currentIterator == null) {
3503                    if (highIterator != null) {
3504                        if (highIterator.hasNext()) {
3505                            currentIterator = highIterator;
3506                            return currentIterator.hasNext();
3507                        }
3508                        if (defaultIterator.hasNext()) {
3509                            currentIterator = defaultIterator;
3510                            return currentIterator.hasNext();
3511                        }
3512                        if (lowIterator.hasNext()) {
3513                            currentIterator = lowIterator;
3514                            return currentIterator.hasNext();
3515                        }
3516                        return false;
3517                    } else {
3518                        currentIterator = defaultIterator;
3519                        return currentIterator.hasNext();
3520                    }
3521                }
3522                if (highIterator != null) {
3523                    if (currentIterator.hasNext()) {
3524                        return true;
3525                    }
3526                    if (currentIterator == highIterator) {
3527                        if (defaultIterator.hasNext()) {
3528                            currentIterator = defaultIterator;
3529                            return currentIterator.hasNext();
3530                        }
3531                        if (lowIterator.hasNext()) {
3532                            currentIterator = lowIterator;
3533                            return currentIterator.hasNext();
3534                        }
3535                        return false;
3536                    }
3537
3538                    if (currentIterator == defaultIterator) {
3539                        if (lowIterator.hasNext()) {
3540                            currentIterator = lowIterator;
3541                            return currentIterator.hasNext();
3542                        }
3543                        return false;
3544                    }
3545                }
3546                return currentIterator.hasNext();
3547            }
3548
3549            @Override
3550            public Entry<Long, MessageKeys> next() {
3551                Entry<Long, MessageKeys> result = currentIterator.next();
3552                if (result != null) {
3553                    Long key = result.getKey();
3554                    if (highIterator != null) {
3555                        if (currentIterator == defaultIterator) {
3556                            lastDefaultKey = key;
3557                        } else if (currentIterator == highIterator) {
3558                            lastHighKey = key;
3559                        } else {
3560                            lastLowKey = key;
3561                        }
3562                    } else {
3563                        lastDefaultKey = key;
3564                    }
3565                }
3566                return result;
3567            }
3568
3569            @Override
3570            public void remove() {
3571                throw new UnsupportedOperationException();
3572            }
3573        }
3574    }
3575
3576    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3577        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3578
3579        @Override
3580        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3581            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3582            ObjectOutputStream oout = new ObjectOutputStream(baos);
3583            oout.writeObject(object);
3584            oout.flush();
3585            oout.close();
3586            byte[] data = baos.toByteArray();
3587            dataOut.writeInt(data.length);
3588            dataOut.write(data);
3589        }
3590
3591        @Override
3592        @SuppressWarnings("unchecked")
3593        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3594            int dataLen = dataIn.readInt();
3595            byte[] data = new byte[dataLen];
3596            dataIn.readFully(data);
3597            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3598            ObjectInputStream oin = new ObjectInputStream(bais);
3599            try {
3600                return (HashSet<String>) oin.readObject();
3601            } catch (ClassNotFoundException cfe) {
3602                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3603                ioe.initCause(cfe);
3604                throw ioe;
3605            }
3606        }
3607    }
3608
3609    public File getIndexDirectory() {
3610        return indexDirectory;
3611    }
3612
3613    public void setIndexDirectory(File indexDirectory) {
3614        this.indexDirectory = indexDirectory;
3615    }
3616
3617    interface IndexAware {
3618        public void sequenceAssignedWithIndexLocked(long index);
3619    }
3620
3621    public String getPreallocationScope() {
3622        return preallocationScope;
3623    }
3624
3625    public void setPreallocationScope(String preallocationScope) {
3626        this.preallocationScope = preallocationScope;
3627    }
3628
3629    public String getPreallocationStrategy() {
3630        return preallocationStrategy;
3631    }
3632
3633    public void setPreallocationStrategy(String preallocationStrategy) {
3634        this.preallocationStrategy = preallocationStrategy;
3635    }
3636
3637    public int getCompactAcksAfterNoGC() {
3638        return compactAcksAfterNoGC;
3639    }
3640
3641    /**
3642     * Sets the number of GC cycles where no journal logs were removed before an attempt to
3643     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
3644     * <p>
3645     * A value of -1 will disable this feature.
3646     *
3647     * @param compactAcksAfterNoGC
3648     *      Number of empty GC cycles before we rewrite old ACKS.
3649     */
3650    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
3651        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
3652    }
3653
3654    /**
3655     * Returns whether Ack compaction will ignore that the store is still growing
3656     * and run more often.
3657     *
3658     * @return the compactAcksIgnoresStoreGrowth current value.
3659     */
3660    public boolean isCompactAcksIgnoresStoreGrowth() {
3661        return compactAcksIgnoresStoreGrowth;
3662    }
3663
3664    /**
3665     * Configure if Ack compaction will occur regardless of continued growth of the
3666     * journal logs meaning that the store has not run out of space yet.  Because the
3667     * compaction operation can be costly this value is defaulted to off and the Ack
3668     * compaction is only done when it seems that the store cannot grow and larger.
3669     *
3670     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
3671     */
3672    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
3673        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
3674    }
3675
3676    /**
3677     * Returns whether Ack compaction is enabled
3678     *
3679     * @return enableAckCompaction
3680     */
3681    public boolean isEnableAckCompaction() {
3682        return enableAckCompaction;
3683    }
3684
3685    /**
3686     * Configure if the Ack compaction task should be enabled to run
3687     *
3688     * @param enableAckCompaction
3689     */
3690    public void setEnableAckCompaction(boolean enableAckCompaction) {
3691        this.enableAckCompaction = enableAckCompaction;
3692    }
3693}