001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.EOFException;
026import java.io.File;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.InterruptedIOException;
030import java.io.ObjectInputStream;
031import java.io.ObjectOutputStream;
032import java.io.OutputStream;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Date;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.LinkedHashMap;
042import java.util.LinkedHashSet;
043import java.util.LinkedList;
044import java.util.List;
045import java.util.Map;
046import java.util.Map.Entry;
047import java.util.Set;
048import java.util.SortedSet;
049import java.util.TreeMap;
050import java.util.TreeSet;
051import java.util.concurrent.ConcurrentHashMap;
052import java.util.concurrent.ConcurrentMap;
053import java.util.concurrent.Executors;
054import java.util.concurrent.ScheduledExecutorService;
055import java.util.concurrent.ThreadFactory;
056import java.util.concurrent.TimeUnit;
057import java.util.concurrent.atomic.AtomicBoolean;
058import java.util.concurrent.atomic.AtomicLong;
059import java.util.concurrent.atomic.AtomicReference;
060import java.util.concurrent.locks.ReentrantReadWriteLock;
061
062import org.apache.activemq.ActiveMQMessageAuditNoSync;
063import org.apache.activemq.broker.BrokerService;
064import org.apache.activemq.broker.BrokerServiceAware;
065import org.apache.activemq.broker.region.Destination;
066import org.apache.activemq.broker.region.Queue;
067import org.apache.activemq.broker.region.Topic;
068import org.apache.activemq.command.MessageAck;
069import org.apache.activemq.command.TransactionId;
070import org.apache.activemq.openwire.OpenWireFormat;
071import org.apache.activemq.protobuf.Buffer;
072import org.apache.activemq.store.MessageStore;
073import org.apache.activemq.store.MessageStoreStatistics;
074import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
075import org.apache.activemq.store.TopicMessageStore;
076import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
077import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
078import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
079import org.apache.activemq.store.kahadb.data.KahaDestination;
080import org.apache.activemq.store.kahadb.data.KahaEntryType;
081import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
082import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
083import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
084import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
085import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
086import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
087import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
088import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
089import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
090import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
091import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
092import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
093import org.apache.activemq.store.kahadb.disk.index.ListIndex;
094import org.apache.activemq.store.kahadb.disk.journal.DataFile;
095import org.apache.activemq.store.kahadb.disk.journal.Journal;
096import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
097import org.apache.activemq.store.kahadb.disk.journal.Location;
098import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
099import org.apache.activemq.store.kahadb.disk.page.Page;
100import org.apache.activemq.store.kahadb.disk.page.PageFile;
101import org.apache.activemq.store.kahadb.disk.page.Transaction;
102import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
103import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
104import org.apache.activemq.store.kahadb.disk.util.Marshaller;
105import org.apache.activemq.store.kahadb.disk.util.Sequence;
106import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
107import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
108import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
109import org.apache.activemq.util.ByteSequence;
110import org.apache.activemq.util.DataByteArrayInputStream;
111import org.apache.activemq.util.DataByteArrayOutputStream;
112import org.apache.activemq.util.IOExceptionSupport;
113import org.apache.activemq.util.IOHelper;
114import org.apache.activemq.util.ServiceStopper;
115import org.apache.activemq.util.ServiceSupport;
116import org.apache.activemq.util.ThreadPoolUtils;
117import org.slf4j.Logger;
118import org.slf4j.LoggerFactory;
119import org.slf4j.MDC;
120
121public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
122
123    protected BrokerService brokerService;
124
125    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
126    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
127    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
128    protected static final Buffer UNMATCHED;
129    static {
130        UNMATCHED = new Buffer(new byte[]{});
131    }
132    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
133
134    static final int CLOSED_STATE = 1;
135    static final int OPEN_STATE = 2;
136    static final long NOT_ACKED = -1;
137
138    static final int VERSION = 6;
139
140    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
141
142    protected class Metadata {
143        protected Page<Metadata> page;
144        protected int state;
145        protected BTreeIndex<String, StoredDestination> destinations;
146        protected Location lastUpdate;
147        protected Location firstInProgressTransactionLocation;
148        protected Location producerSequenceIdTrackerLocation = null;
149        protected Location ackMessageFileMapLocation = null;
150        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
151        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
152        protected int version = VERSION;
153        protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
154
155        public void read(DataInput is) throws IOException {
156            state = is.readInt();
157            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
158            if (is.readBoolean()) {
159                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
160            } else {
161                lastUpdate = null;
162            }
163            if (is.readBoolean()) {
164                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
165            } else {
166                firstInProgressTransactionLocation = null;
167            }
168            try {
169                if (is.readBoolean()) {
170                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
171                } else {
172                    producerSequenceIdTrackerLocation = null;
173                }
174            } catch (EOFException expectedOnUpgrade) {
175            }
176            try {
177                version = is.readInt();
178            } catch (EOFException expectedOnUpgrade) {
179                version = 1;
180            }
181            if (version >= 5 && is.readBoolean()) {
182                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
183            } else {
184                ackMessageFileMapLocation = null;
185            }
186            try {
187                openwireVersion = is.readInt();
188            } catch (EOFException expectedOnUpgrade) {
189                openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
190            }
191            LOG.info("KahaDB is version " + version);
192        }
193
194        public void write(DataOutput os) throws IOException {
195            os.writeInt(state);
196            os.writeLong(destinations.getPageId());
197
198            if (lastUpdate != null) {
199                os.writeBoolean(true);
200                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
201            } else {
202                os.writeBoolean(false);
203            }
204
205            if (firstInProgressTransactionLocation != null) {
206                os.writeBoolean(true);
207                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
208            } else {
209                os.writeBoolean(false);
210            }
211
212            if (producerSequenceIdTrackerLocation != null) {
213                os.writeBoolean(true);
214                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
215            } else {
216                os.writeBoolean(false);
217            }
218            os.writeInt(VERSION);
219            if (ackMessageFileMapLocation != null) {
220                os.writeBoolean(true);
221                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
222            } else {
223                os.writeBoolean(false);
224            }
225            os.writeInt(this.openwireVersion);
226        }
227    }
228
229    class MetadataMarshaller extends VariableMarshaller<Metadata> {
230        @Override
231        public Metadata readPayload(DataInput dataIn) throws IOException {
232            Metadata rc = createMetadata();
233            rc.read(dataIn);
234            return rc;
235        }
236
237        @Override
238        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
239            object.write(dataOut);
240        }
241    }
242
243    protected PageFile pageFile;
244    protected Journal journal;
245    protected Metadata metadata = new Metadata();
246
247    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
248
249    protected boolean failIfDatabaseIsLocked;
250
251    protected boolean deleteAllMessages;
252    protected File directory = DEFAULT_DIRECTORY;
253    protected File indexDirectory = null;
254    protected ScheduledExecutorService scheduler;
255    private final Object schedulerLock = new Object();
256
257    protected String journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
258    protected boolean archiveDataLogs;
259    protected File directoryArchive;
260    protected AtomicLong journalSize = new AtomicLong(0);
261    long journalDiskSyncInterval = 1000;
262    long checkpointInterval = 5*1000;
263    long cleanupInterval = 30*1000;
264    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
265    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
266    boolean enableIndexWriteAsync = false;
267    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
268    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
269    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
270
271    protected AtomicBoolean opened = new AtomicBoolean();
272    private boolean ignoreMissingJournalfiles = false;
273    private int indexCacheSize = 10000;
274    private boolean checkForCorruptJournalFiles = false;
275    private boolean checksumJournalFiles = true;
276    protected boolean forceRecoverIndex = false;
277    private boolean archiveCorruptedIndex = false;
278    private boolean useIndexLFRUEviction = false;
279    private float indexLFUEvictionFactor = 0.2f;
280    private boolean enableIndexDiskSyncs = true;
281    private boolean enableIndexRecoveryFile = true;
282    private boolean enableIndexPageCaching = true;
283    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
284
285    private boolean enableAckCompaction = true;
286    private int compactAcksAfterNoGC = 10;
287    private boolean compactAcksIgnoresStoreGrowth = false;
288    private int checkPointCyclesWithNoGC;
289    private int journalLogOnLastCompactionCheck;
290    private boolean enableSubscriptionStatistics = false;
291
292    //only set when using JournalDiskSyncStrategy.PERIODIC
293    protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>();
294
295    @Override
296    public void doStart() throws Exception {
297        load();
298    }
299
300    @Override
301    public void doStop(ServiceStopper stopper) throws Exception {
302        unload();
303    }
304
305    private void loadPageFile() throws IOException {
306        this.indexLock.writeLock().lock();
307        try {
308            final PageFile pageFile = getPageFile();
309            pageFile.load();
310            pageFile.tx().execute(new Transaction.Closure<IOException>() {
311                @Override
312                public void execute(Transaction tx) throws IOException {
313                    if (pageFile.getPageCount() == 0) {
314                        // First time this is created.. Initialize the metadata
315                        Page<Metadata> page = tx.allocate();
316                        assert page.getPageId() == 0;
317                        page.set(metadata);
318                        metadata.page = page;
319                        metadata.state = CLOSED_STATE;
320                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
321
322                        tx.store(metadata.page, metadataMarshaller, true);
323                    } else {
324                        Page<Metadata> page = tx.load(0, metadataMarshaller);
325                        metadata = page.get();
326                        metadata.page = page;
327                    }
328                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
329                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
330                    metadata.destinations.load(tx);
331                }
332            });
333            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
334            // Perhaps we should just keep an index of file
335            storedDestinations.clear();
336            pageFile.tx().execute(new Transaction.Closure<IOException>() {
337                @Override
338                public void execute(Transaction tx) throws IOException {
339                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
340                        Entry<String, StoredDestination> entry = iterator.next();
341                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
342                        storedDestinations.put(entry.getKey(), sd);
343
344                        if (checkForCorruptJournalFiles) {
345                            // sanity check the index also
346                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
347                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
348                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
349                                }
350                            }
351                        }
352                    }
353                }
354            });
355            pageFile.flush();
356        } finally {
357            this.indexLock.writeLock().unlock();
358        }
359    }
360
361    private void startCheckpoint() {
362        if (checkpointInterval == 0 && cleanupInterval == 0) {
363            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
364            return;
365        }
366        synchronized (schedulerLock) {
367            if (scheduler == null || scheduler.isShutdown()) {
368                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
369
370                    @Override
371                    public Thread newThread(Runnable r) {
372                        Thread schedulerThread = new Thread(r);
373
374                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
375                        schedulerThread.setDaemon(true);
376
377                        return schedulerThread;
378                    }
379                });
380
381                // Short intervals for check-point and cleanups
382                long delay;
383                if (journal.isJournalDiskSyncPeriodic()) {
384                    delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
385                } else {
386                    delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
387                }
388
389                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
390            }
391        }
392    }
393
394    private final class CheckpointRunner implements Runnable {
395
396        private long lastCheckpoint = System.currentTimeMillis();
397        private long lastCleanup = System.currentTimeMillis();
398        private long lastSync = System.currentTimeMillis();
399        private Location lastAsyncUpdate = null;
400
401        @Override
402        public void run() {
403            try {
404                // Decide on cleanup vs full checkpoint here.
405                if (opened.get()) {
406                    long now = System.currentTimeMillis();
407                    if (journal.isJournalDiskSyncPeriodic() &&
408                            journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
409                        Location currentUpdate = lastAsyncJournalUpdate.get();
410                        if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) {
411                            lastAsyncUpdate = currentUpdate;
412                            if (LOG.isTraceEnabled()) {
413                                LOG.trace("Writing trace command to trigger journal sync");
414                            }
415                            store(new KahaTraceCommand(), true, null, null);
416                        }
417                        lastSync = now;
418                    }
419                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
420                        checkpointCleanup(true);
421                        lastCleanup = now;
422                        lastCheckpoint = now;
423                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
424                        checkpointCleanup(false);
425                        lastCheckpoint = now;
426                    }
427                }
428            } catch (IOException ioe) {
429                LOG.error("Checkpoint failed", ioe);
430                brokerService.handleIOException(ioe);
431            } catch (Throwable e) {
432                LOG.error("Checkpoint failed", e);
433                brokerService.handleIOException(IOExceptionSupport.create(e));
434            }
435        }
436    }
437
438    public void open() throws IOException {
439        if( opened.compareAndSet(false, true) ) {
440            getJournal().start();
441            try {
442                loadPageFile();
443            } catch (Throwable t) {
444                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
445                if (LOG.isDebugEnabled()) {
446                    LOG.debug("Index load failure", t);
447                }
448                // try to recover index
449                try {
450                    pageFile.unload();
451                } catch (Exception ignore) {}
452                if (archiveCorruptedIndex) {
453                    pageFile.archive();
454                } else {
455                    pageFile.delete();
456                }
457                metadata = createMetadata();
458                //The metadata was recreated after a detect corruption so we need to
459                //reconfigure anything that was configured on the old metadata on startup
460                configureMetadata();
461                pageFile = null;
462                loadPageFile();
463            }
464            recover();
465            startCheckpoint();
466        }
467    }
468
469    public void load() throws IOException {
470        this.indexLock.writeLock().lock();
471        IOHelper.mkdirs(directory);
472        try {
473            if (deleteAllMessages) {
474                getJournal().start();
475                getJournal().delete();
476                getJournal().close();
477                journal = null;
478                getPageFile().delete();
479                LOG.info("Persistence store purged.");
480                deleteAllMessages = false;
481            }
482
483            open();
484            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
485        } finally {
486            this.indexLock.writeLock().unlock();
487        }
488    }
489
490    public void close() throws IOException, InterruptedException {
491        if( opened.compareAndSet(true, false)) {
492            checkpointLock.writeLock().lock();
493            try {
494                if (metadata.page != null) {
495                    checkpointUpdate(true);
496                }
497                pageFile.unload();
498                metadata = createMetadata();
499            } finally {
500                checkpointLock.writeLock().unlock();
501            }
502            journal.close();
503            synchronized(schedulerLock) {
504                if (scheduler != null) {
505                    ThreadPoolUtils.shutdownGraceful(scheduler, -1);
506                    scheduler = null;
507                }
508            }
509            // clear the cache and journalSize on shutdown of the store
510            storeCache.clear();
511            journalSize.set(0);
512        }
513    }
514
515    public void unload() throws IOException, InterruptedException {
516        this.indexLock.writeLock().lock();
517        try {
518            if( pageFile != null && pageFile.isLoaded() ) {
519                metadata.state = CLOSED_STATE;
520                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
521
522                if (metadata.page != null) {
523                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
524                        @Override
525                        public void execute(Transaction tx) throws IOException {
526                            tx.store(metadata.page, metadataMarshaller, true);
527                        }
528                    });
529                }
530            }
531        } finally {
532            this.indexLock.writeLock().unlock();
533        }
534        close();
535    }
536
537    // public for testing
538    @SuppressWarnings("rawtypes")
539    public Location[] getInProgressTxLocationRange() {
540        Location[] range = new Location[]{null, null};
541        synchronized (inflightTransactions) {
542            if (!inflightTransactions.isEmpty()) {
543                for (List<Operation> ops : inflightTransactions.values()) {
544                    if (!ops.isEmpty()) {
545                        trackMaxAndMin(range, ops);
546                    }
547                }
548            }
549            if (!preparedTransactions.isEmpty()) {
550                for (List<Operation> ops : preparedTransactions.values()) {
551                    if (!ops.isEmpty()) {
552                        trackMaxAndMin(range, ops);
553                    }
554                }
555            }
556        }
557        return range;
558    }
559
560    @SuppressWarnings("rawtypes")
561    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
562        Location t = ops.get(0).getLocation();
563        if (range[0] == null || t.compareTo(range[0]) <= 0) {
564            range[0] = t;
565        }
566        t = ops.get(ops.size() -1).getLocation();
567        if (range[1] == null || t.compareTo(range[1]) >= 0) {
568            range[1] = t;
569        }
570    }
571
572    class TranInfo {
573        TransactionId id;
574        Location location;
575
576        class opCount {
577            int add;
578            int remove;
579        }
580        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
581
582        @SuppressWarnings("rawtypes")
583        public void track(Operation operation) {
584            if (location == null ) {
585                location = operation.getLocation();
586            }
587            KahaDestination destination;
588            boolean isAdd = false;
589            if (operation instanceof AddOperation) {
590                AddOperation add = (AddOperation) operation;
591                destination = add.getCommand().getDestination();
592                isAdd = true;
593            } else {
594                RemoveOperation removeOpperation = (RemoveOperation) operation;
595                destination = removeOpperation.getCommand().getDestination();
596            }
597            opCount opCount = destinationOpCount.get(destination);
598            if (opCount == null) {
599                opCount = new opCount();
600                destinationOpCount.put(destination, opCount);
601            }
602            if (isAdd) {
603                opCount.add++;
604            } else {
605                opCount.remove++;
606            }
607        }
608
609        @Override
610        public String toString() {
611           StringBuffer buffer = new StringBuffer();
612           buffer.append(location).append(";").append(id).append(";\n");
613           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
614               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
615           }
616           return buffer.toString();
617        }
618    }
619
620    @SuppressWarnings("rawtypes")
621    public String getTransactions() {
622
623        ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
624        synchronized (inflightTransactions) {
625            if (!inflightTransactions.isEmpty()) {
626                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
627                    TranInfo info = new TranInfo();
628                    info.id = entry.getKey();
629                    for (Operation operation : entry.getValue()) {
630                        info.track(operation);
631                    }
632                    infos.add(info);
633                }
634            }
635        }
636        synchronized (preparedTransactions) {
637            if (!preparedTransactions.isEmpty()) {
638                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
639                    TranInfo info = new TranInfo();
640                    info.id = entry.getKey();
641                    for (Operation operation : entry.getValue()) {
642                        info.track(operation);
643                    }
644                    infos.add(info);
645                }
646            }
647        }
648        return infos.toString();
649    }
650
651    /**
652     * Move all the messages that were in the journal into long term storage. We
653     * just replay and do a checkpoint.
654     *
655     * @throws IOException
656     * @throws IOException
657     * @throws IllegalStateException
658     */
659    private void recover() throws IllegalStateException, IOException {
660        this.indexLock.writeLock().lock();
661        try {
662
663            long start = System.currentTimeMillis();
664            Location afterProducerAudit = recoverProducerAudit();
665            Location afterAckMessageFile = recoverAckMessageFileMap();
666            Location lastIndoubtPosition = getRecoveryPosition();
667
668            if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
669                // valid checkpoint, possible recover from afterAckMessageFile
670                afterProducerAudit = null;
671            }
672            Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
673            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
674
675            if (recoveryPosition != null) {
676                int redoCounter = 0;
677                int dataFileRotationTracker = recoveryPosition.getDataFileId();
678                LOG.info("Recovering from the journal @" + recoveryPosition);
679                while (recoveryPosition != null) {
680                    try {
681                        JournalCommand<?> message = load(recoveryPosition);
682                        metadata.lastUpdate = recoveryPosition;
683                        process(message, recoveryPosition, lastIndoubtPosition);
684                        redoCounter++;
685                    } catch (IOException failedRecovery) {
686                        if (isIgnoreMissingJournalfiles()) {
687                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
688                            // track this dud location
689                            journal.corruptRecoveryLocation(recoveryPosition);
690                        } else {
691                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
692                        }
693                    }
694                    recoveryPosition = journal.getNextLocation(recoveryPosition);
695                    // hold on to the minimum number of open files during recovery
696                    if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
697                        dataFileRotationTracker = recoveryPosition.getDataFileId();
698                        journal.cleanup();
699                    }
700                    if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
701                        LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
702                    }
703                }
704                if (LOG.isInfoEnabled()) {
705                    long end = System.currentTimeMillis();
706                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
707                }
708            }
709
710            // We may have to undo some index updates.
711            pageFile.tx().execute(new Transaction.Closure<IOException>() {
712                @Override
713                public void execute(Transaction tx) throws IOException {
714                    recoverIndex(tx);
715                }
716            });
717
718            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
719            Set<TransactionId> toRollback = new HashSet<TransactionId>();
720            Set<TransactionId> toDiscard = new HashSet<TransactionId>();
721            synchronized (inflightTransactions) {
722                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
723                    TransactionId id = it.next();
724                    if (id.isLocalTransaction()) {
725                        toRollback.add(id);
726                    } else {
727                        toDiscard.add(id);
728                    }
729                }
730                for (TransactionId tx: toRollback) {
731                    if (LOG.isDebugEnabled()) {
732                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
733                    }
734                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
735                }
736                for (TransactionId tx: toDiscard) {
737                    if (LOG.isDebugEnabled()) {
738                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
739                    }
740                    inflightTransactions.remove(tx);
741                }
742            }
743
744            synchronized (preparedTransactions) {
745                for (TransactionId txId : preparedTransactions.keySet()) {
746                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
747                }
748            }
749
750        } finally {
751            this.indexLock.writeLock().unlock();
752        }
753    }
754
755    @SuppressWarnings("unused")
756    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
757        return TransactionIdConversion.convertToLocal(tx);
758    }
759
760    private Location minimum(Location x,
761                             Location y) {
762        Location min = null;
763        if (x != null) {
764            min = x;
765            if (y != null) {
766                int compare = y.compareTo(x);
767                if (compare < 0) {
768                    min = y;
769                }
770            }
771        } else {
772            min = y;
773        }
774        return min;
775    }
776
777    private Location recoverProducerAudit() throws IOException {
778        if (metadata.producerSequenceIdTrackerLocation != null) {
779            try {
780                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
781                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
782                int maxNumProducers = getMaxFailoverProducersToTrack();
783                int maxAuditDepth = getFailoverProducersAuditDepth();
784                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
785                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
786                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
787                return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
788            } catch (Exception e) {
789                LOG.warn("Cannot recover message audit", e);
790                return journal.getNextLocation(null);
791            }
792        } else {
793            // got no audit stored so got to recreate via replay from start of the journal
794            return journal.getNextLocation(null);
795        }
796    }
797
798    @SuppressWarnings("unchecked")
799    private Location recoverAckMessageFileMap() throws IOException {
800        if (metadata.ackMessageFileMapLocation != null) {
801            try {
802                KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
803                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
804                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
805                return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
806            } catch (Exception e) {
807                LOG.warn("Cannot recover ackMessageFileMap", e);
808                return journal.getNextLocation(null);
809            }
810        } else {
811            // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
812            return journal.getNextLocation(null);
813        }
814    }
815
816    protected void recoverIndex(Transaction tx) throws IOException {
817        long start = System.currentTimeMillis();
818        // It is possible index updates got applied before the journal updates..
819        // in that case we need to removed references to messages that are not in the journal
820        final Location lastAppendLocation = journal.getLastAppendLocation();
821        long undoCounter=0;
822
823        // Go through all the destinations to see if they have messages past the lastAppendLocation
824        for (String key : storedDestinations.keySet()) {
825            StoredDestination sd = storedDestinations.get(key);
826
827            final ArrayList<Long> matches = new ArrayList<Long>();
828            // Find all the Locations that are >= than the last Append Location.
829            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
830                @Override
831                protected void matched(Location key, Long value) {
832                    matches.add(value);
833                }
834            });
835
836            for (Long sequenceId : matches) {
837                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
838                if (keys != null) {
839                    sd.locationIndex.remove(tx, keys.location);
840                    sd.messageIdIndex.remove(tx, keys.messageId);
841                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
842                    undoCounter++;
843                    decrementAndSubSizeToStoreStat(key, keys.location.getSize());
844                    // TODO: do we need to modify the ack positions for the pub sub case?
845                }
846            }
847        }
848
849        if (undoCounter > 0) {
850            // The rolledback operations are basically in flight journal writes.  To avoid getting
851            // these the end user should do sync writes to the journal.
852            if (LOG.isInfoEnabled()) {
853                long end = System.currentTimeMillis();
854                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
855            }
856        }
857
858        undoCounter = 0;
859        start = System.currentTimeMillis();
860
861        // Lets be extra paranoid here and verify that all the datafiles being referenced
862        // by the indexes still exists.
863
864        final SequenceSet ss = new SequenceSet();
865        for (StoredDestination sd : storedDestinations.values()) {
866            // Use a visitor to cut down the number of pages that we load
867            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
868                int last=-1;
869
870                @Override
871                public boolean isInterestedInKeysBetween(Location first, Location second) {
872                    if( first==null ) {
873                        return !ss.contains(0, second.getDataFileId());
874                    } else if( second==null ) {
875                        return true;
876                    } else {
877                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
878                    }
879                }
880
881                @Override
882                public void visit(List<Location> keys, List<Long> values) {
883                    for (Location l : keys) {
884                        int fileId = l.getDataFileId();
885                        if( last != fileId ) {
886                            ss.add(fileId);
887                            last = fileId;
888                        }
889                    }
890                }
891
892            });
893        }
894        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
895        while (!ss.isEmpty()) {
896            missingJournalFiles.add((int) ss.removeFirst());
897        }
898
899        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
900            missingJournalFiles.add(entry.getKey());
901            for (Integer i : entry.getValue()) {
902                missingJournalFiles.add(i);
903            }
904        }
905
906        missingJournalFiles.removeAll(journal.getFileMap().keySet());
907
908        if (!missingJournalFiles.isEmpty()) {
909            LOG.warn("Some journal files are missing: " + missingJournalFiles);
910        }
911
912        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
913        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
914        for (Integer missing : missingJournalFiles) {
915            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
916        }
917
918        if (checkForCorruptJournalFiles) {
919            Collection<DataFile> dataFiles = journal.getFileMap().values();
920            for (DataFile dataFile : dataFiles) {
921                int id = dataFile.getDataFileId();
922                // eof to next file id
923                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
924                Sequence seq = dataFile.getCorruptedBlocks().getHead();
925                while (seq != null) {
926                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
927                        new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
928                    missingPredicates.add(visitor);
929                    knownCorruption.add(visitor);
930                    seq = seq.getNext();
931                }
932            }
933        }
934
935        if (!missingPredicates.isEmpty()) {
936            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
937                final StoredDestination sd = sdEntry.getValue();
938                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>();
939                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
940                    @Override
941                    protected void matched(Location key, Long value) {
942                        matches.put(value, key);
943                    }
944                });
945
946                // If some message references are affected by the missing data files...
947                if (!matches.isEmpty()) {
948
949                    // We either 'gracefully' recover dropping the missing messages or
950                    // we error out.
951                    if( ignoreMissingJournalfiles ) {
952                        // Update the index to remove the references to the missing data
953                        for (Long sequenceId : matches.keySet()) {
954                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
955                            sd.locationIndex.remove(tx, keys.location);
956                            sd.messageIdIndex.remove(tx, keys.messageId);
957                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
958                            undoCounter++;
959                            decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
960                            // TODO: do we need to modify the ack positions for the pub sub case?
961                        }
962                    } else {
963                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
964                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
965                    }
966                }
967            }
968        }
969
970        if (!ignoreMissingJournalfiles) {
971            if (!knownCorruption.isEmpty()) {
972                LOG.error("Detected corrupt journal files. " + knownCorruption);
973                throw new IOException("Detected corrupt journal files. " + knownCorruption);
974            }
975
976            if (!missingJournalFiles.isEmpty()) {
977                LOG.error("Detected missing journal files. " + missingJournalFiles);
978                throw new IOException("Detected missing journal files. " + missingJournalFiles);
979            }
980        }
981
982        if (undoCounter > 0) {
983            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
984            // should do sync writes to the journal.
985            if (LOG.isInfoEnabled()) {
986                long end = System.currentTimeMillis();
987                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
988            }
989        }
990    }
991
992    private Location nextRecoveryPosition;
993    private Location lastRecoveryPosition;
994
995    public void incrementalRecover() throws IOException {
996        this.indexLock.writeLock().lock();
997        try {
998            if( nextRecoveryPosition == null ) {
999                if( lastRecoveryPosition==null ) {
1000                    nextRecoveryPosition = getRecoveryPosition();
1001                } else {
1002                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1003                }
1004            }
1005            while (nextRecoveryPosition != null) {
1006                lastRecoveryPosition = nextRecoveryPosition;
1007                metadata.lastUpdate = lastRecoveryPosition;
1008                JournalCommand<?> message = load(lastRecoveryPosition);
1009                process(message, lastRecoveryPosition, (IndexAware) null);
1010                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1011            }
1012        } finally {
1013            this.indexLock.writeLock().unlock();
1014        }
1015    }
1016
1017    public Location getLastUpdatePosition() throws IOException {
1018        return metadata.lastUpdate;
1019    }
1020
1021    private Location getRecoveryPosition() throws IOException {
1022
1023        if (!this.forceRecoverIndex) {
1024
1025            // If we need to recover the transactions..
1026            if (metadata.firstInProgressTransactionLocation != null) {
1027                return metadata.firstInProgressTransactionLocation;
1028            }
1029
1030            // Perhaps there were no transactions...
1031            if( metadata.lastUpdate!=null) {
1032                // Start replay at the record after the last one recorded in the index file.
1033                return getNextInitializedLocation(metadata.lastUpdate);
1034            }
1035        }
1036        // This loads the first position.
1037        return journal.getNextLocation(null);
1038    }
1039
1040    private Location getNextInitializedLocation(Location location) throws IOException {
1041        Location mayNotBeInitialized = journal.getNextLocation(location);
1042        if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
1043            // need to init size and type to skip
1044            return journal.getNextLocation(mayNotBeInitialized);
1045        } else {
1046            return mayNotBeInitialized;
1047        }
1048    }
1049
1050    protected void checkpointCleanup(final boolean cleanup) throws IOException {
1051        long start;
1052        this.indexLock.writeLock().lock();
1053        try {
1054            start = System.currentTimeMillis();
1055            if( !opened.get() ) {
1056                return;
1057            }
1058        } finally {
1059            this.indexLock.writeLock().unlock();
1060        }
1061        checkpointUpdate(cleanup);
1062        long end = System.currentTimeMillis();
1063        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1064            if (LOG.isInfoEnabled()) {
1065                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
1066            }
1067        }
1068    }
1069
1070    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
1071        int size = data.serializedSizeFramed();
1072        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
1073        os.writeByte(data.type().getNumber());
1074        data.writeFramed(os);
1075        return os.toByteSequence();
1076    }
1077
1078    // /////////////////////////////////////////////////////////////////
1079    // Methods call by the broker to update and query the store.
1080    // /////////////////////////////////////////////////////////////////
1081    public Location store(JournalCommand<?> data) throws IOException {
1082        return store(data, false, null,null);
1083    }
1084
1085    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1086        return store(data, false, null, null, onJournalStoreComplete);
1087    }
1088
1089    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1090        return store(data, sync, before, after, null);
1091    }
1092
1093    /**
1094     * All updated are are funneled through this method. The updates are converted
1095     * to a JournalMessage which is logged to the journal and then the data from
1096     * the JournalMessage is used to update the index just like it would be done
1097     * during a recovery process.
1098     */
1099    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1100        try {
1101            ByteSequence sequence = toByteSequence(data);
1102            Location location;
1103
1104            checkpointLock.readLock().lock();
1105            try {
1106
1107                long start = System.currentTimeMillis();
1108                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
1109                long start2 = System.currentTimeMillis();
1110                //Track the last async update so we know if we need to sync at the next checkpoint
1111                if (!sync && journal.isJournalDiskSyncPeriodic()) {
1112                    lastAsyncJournalUpdate.set(location);
1113                }
1114                process(data, location, before);
1115
1116                long end = System.currentTimeMillis();
1117                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1118                    if (LOG.isInfoEnabled()) {
1119                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1120                    }
1121                }
1122            } finally {
1123                checkpointLock.readLock().unlock();
1124            }
1125
1126            if (after != null) {
1127                after.run();
1128            }
1129
1130            if (scheduler == null && opened.get()) {
1131                startCheckpoint();
1132            }
1133            return location;
1134        } catch (IOException ioe) {
1135            LOG.error("KahaDB failed to store to Journal", ioe);
1136            brokerService.handleIOException(ioe);
1137            throw ioe;
1138        }
1139    }
1140
1141    /**
1142     * Loads a previously stored JournalMessage
1143     *
1144     * @param location
1145     * @return
1146     * @throws IOException
1147     */
1148    public JournalCommand<?> load(Location location) throws IOException {
1149        long start = System.currentTimeMillis();
1150        ByteSequence data = journal.read(location);
1151        long end = System.currentTimeMillis();
1152        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1153            if (LOG.isInfoEnabled()) {
1154                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1155            }
1156        }
1157        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1158        byte readByte = is.readByte();
1159        KahaEntryType type = KahaEntryType.valueOf(readByte);
1160        if( type == null ) {
1161            try {
1162                is.close();
1163            } catch (IOException e) {}
1164            throw new IOException("Could not load journal record. Invalid location: "+location);
1165        }
1166        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1167        message.mergeFramed(is);
1168        return message;
1169    }
1170
1171    /**
1172     * do minimal recovery till we reach the last inDoubtLocation
1173     * @param data
1174     * @param location
1175     * @param inDoubtlocation
1176     * @throws IOException
1177     */
1178    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1179        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1180            process(data, location, (IndexAware) null);
1181        } else {
1182            // just recover producer audit
1183            data.visit(new Visitor() {
1184                @Override
1185                public void visit(KahaAddMessageCommand command) throws IOException {
1186                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1187                }
1188            });
1189        }
1190    }
1191
1192    // /////////////////////////////////////////////////////////////////
1193    // Journaled record processing methods. Once the record is journaled,
1194    // these methods handle applying the index updates. These may be called
1195    // from the recovery method too so they need to be idempotent
1196    // /////////////////////////////////////////////////////////////////
1197
1198    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1199        data.visit(new Visitor() {
1200            @Override
1201            public void visit(KahaAddMessageCommand command) throws IOException {
1202                process(command, location, onSequenceAssignedCallback);
1203            }
1204
1205            @Override
1206            public void visit(KahaRemoveMessageCommand command) throws IOException {
1207                process(command, location);
1208            }
1209
1210            @Override
1211            public void visit(KahaPrepareCommand command) throws IOException {
1212                process(command, location);
1213            }
1214
1215            @Override
1216            public void visit(KahaCommitCommand command) throws IOException {
1217                process(command, location, onSequenceAssignedCallback);
1218            }
1219
1220            @Override
1221            public void visit(KahaRollbackCommand command) throws IOException {
1222                process(command, location);
1223            }
1224
1225            @Override
1226            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1227                process(command, location);
1228            }
1229
1230            @Override
1231            public void visit(KahaSubscriptionCommand command) throws IOException {
1232                process(command, location);
1233            }
1234
1235            @Override
1236            public void visit(KahaProducerAuditCommand command) throws IOException {
1237                processLocation(location);
1238            }
1239
1240            @Override
1241            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1242                processLocation(location);
1243            }
1244
1245            @Override
1246            public void visit(KahaTraceCommand command) {
1247                processLocation(location);
1248            }
1249
1250            @Override
1251            public void visit(KahaUpdateMessageCommand command) throws IOException {
1252                process(command, location);
1253            }
1254
1255            @Override
1256            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
1257                process(command, location);
1258            }
1259        });
1260    }
1261
1262    @SuppressWarnings("rawtypes")
1263    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1264        if (command.hasTransactionInfo()) {
1265            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1266            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1267        } else {
1268            this.indexLock.writeLock().lock();
1269            try {
1270                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1271                    @Override
1272                    public void execute(Transaction tx) throws IOException {
1273                        long assignedIndex = updateIndex(tx, command, location);
1274                        if (runWithIndexLock != null) {
1275                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1276                        }
1277                    }
1278                });
1279
1280            } finally {
1281                this.indexLock.writeLock().unlock();
1282            }
1283        }
1284    }
1285
1286    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1287        this.indexLock.writeLock().lock();
1288        try {
1289            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1290                @Override
1291                public void execute(Transaction tx) throws IOException {
1292                    updateIndex(tx, command, location);
1293                }
1294            });
1295        } finally {
1296            this.indexLock.writeLock().unlock();
1297        }
1298    }
1299
1300    @SuppressWarnings("rawtypes")
1301    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1302        if (command.hasTransactionInfo()) {
1303           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1304           inflightTx.add(new RemoveOperation(command, location));
1305        } else {
1306            this.indexLock.writeLock().lock();
1307            try {
1308                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1309                    @Override
1310                    public void execute(Transaction tx) throws IOException {
1311                        updateIndex(tx, command, location);
1312                    }
1313                });
1314            } finally {
1315                this.indexLock.writeLock().unlock();
1316            }
1317        }
1318    }
1319
1320    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1321        this.indexLock.writeLock().lock();
1322        try {
1323            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1324                @Override
1325                public void execute(Transaction tx) throws IOException {
1326                    updateIndex(tx, command, location);
1327                }
1328            });
1329        } finally {
1330            this.indexLock.writeLock().unlock();
1331        }
1332    }
1333
1334    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1335        this.indexLock.writeLock().lock();
1336        try {
1337            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1338                @Override
1339                public void execute(Transaction tx) throws IOException {
1340                    updateIndex(tx, command, location);
1341                }
1342            });
1343        } finally {
1344            this.indexLock.writeLock().unlock();
1345        }
1346    }
1347
1348    protected void processLocation(final Location location) {
1349        this.indexLock.writeLock().lock();
1350        try {
1351            metadata.lastUpdate = location;
1352        } finally {
1353            this.indexLock.writeLock().unlock();
1354        }
1355    }
1356
1357    @SuppressWarnings("rawtypes")
1358    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1359        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1360        List<Operation> inflightTx;
1361        synchronized (inflightTransactions) {
1362            inflightTx = inflightTransactions.remove(key);
1363            if (inflightTx == null) {
1364                inflightTx = preparedTransactions.remove(key);
1365            }
1366        }
1367        if (inflightTx == null) {
1368            // only non persistent messages in this tx
1369            if (before != null) {
1370                before.sequenceAssignedWithIndexLocked(-1);
1371            }
1372            return;
1373        }
1374
1375        final List<Operation> messagingTx = inflightTx;
1376        indexLock.writeLock().lock();
1377        try {
1378            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1379                @Override
1380                public void execute(Transaction tx) throws IOException {
1381                    for (Operation op : messagingTx) {
1382                        op.execute(tx);
1383                    }
1384                }
1385            });
1386            metadata.lastUpdate = location;
1387        } finally {
1388            indexLock.writeLock().unlock();
1389        }
1390    }
1391
1392    @SuppressWarnings("rawtypes")
1393    protected void process(KahaPrepareCommand command, Location location) {
1394        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1395        synchronized (inflightTransactions) {
1396            List<Operation> tx = inflightTransactions.remove(key);
1397            if (tx != null) {
1398                preparedTransactions.put(key, tx);
1399            }
1400        }
1401    }
1402
1403    @SuppressWarnings("rawtypes")
1404    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1405        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1406        List<Operation> updates = null;
1407        synchronized (inflightTransactions) {
1408            updates = inflightTransactions.remove(key);
1409            if (updates == null) {
1410                updates = preparedTransactions.remove(key);
1411            }
1412        }
1413    }
1414
1415    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
1416        final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1417
1418        // Mark the current journal file as a compacted file so that gc checks can skip
1419        // over logs that are smaller compaction type logs.
1420        DataFile current = journal.getDataFileById(location.getDataFileId());
1421        current.setTypeCode(command.getRewriteType());
1422
1423        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
1424            // Move offset so that next location read jumps to next file.
1425            location.setOffset(journalMaxFileLength);
1426        }
1427    }
1428
1429    // /////////////////////////////////////////////////////////////////
1430    // These methods do the actual index updates.
1431    // /////////////////////////////////////////////////////////////////
1432
1433    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1434    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1435
1436    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1437        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1438
1439        // Skip adding the message to the index if this is a topic and there are
1440        // no subscriptions.
1441        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1442            return -1;
1443        }
1444
1445        // Add the message.
1446        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1447        long id = sd.orderIndex.getNextMessageId();
1448        Long previous = sd.locationIndex.put(tx, location, id);
1449        if (previous == null) {
1450            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1451            if (previous == null) {
1452                incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1453                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1454                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1455                    addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
1456                }
1457                metadata.lastUpdate = location;
1458            } else {
1459
1460                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1461                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1462                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1463                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1464                }
1465                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1466                sd.locationIndex.remove(tx, location);
1467                id = -1;
1468            }
1469        } else {
1470            // restore the previous value.. Looks like this was a redo of a previously
1471            // added message. We don't want to assign it a new id as the other indexes would
1472            // be wrong..
1473            sd.locationIndex.put(tx, location, previous);
1474            // ensure sequence is not broken
1475            sd.orderIndex.revertNextMessageId();
1476            metadata.lastUpdate = location;
1477        }
1478        // record this id in any event, initial send or recovery
1479        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1480
1481       return id;
1482    }
1483
1484    void trackPendingAdd(KahaDestination destination, Long seq) {
1485        StoredDestination sd = storedDestinations.get(key(destination));
1486        if (sd != null) {
1487            sd.trackPendingAdd(seq);
1488        }
1489    }
1490
1491    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1492        StoredDestination sd = storedDestinations.get(key(destination));
1493        if (sd != null) {
1494            sd.trackPendingAddComplete(seq);
1495        }
1496    }
1497
1498    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1499        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1500        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1501
1502        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1503        if (id != null) {
1504            MessageKeys previousKeys = sd.orderIndex.put(
1505                    tx,
1506                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1507                    id,
1508                    new MessageKeys(command.getMessageId(), location)
1509            );
1510            sd.locationIndex.put(tx, location, id);
1511            incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1512
1513            if (previousKeys != null) {
1514                //Remove the existing from the size
1515                decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
1516
1517                //update all the subscription metrics
1518                if (enableSubscriptionStatistics && location.getSize() != previousKeys.location.getSize()) {
1519                    Iterator<Entry<String, SequenceSet>> iter = sd.ackPositions.iterator(tx);
1520                    while (iter.hasNext()) {
1521                        Entry<String, SequenceSet> e = iter.next();
1522                        if (e.getValue().contains(id)) {
1523                            incrementAndAddSizeToStoreStat(key(command.getDestination()), e.getKey(), location.getSize());
1524                            decrementAndSubSizeToStoreStat(key(command.getDestination()), e.getKey(), previousKeys.location.getSize());
1525                        }
1526                    }
1527                }
1528
1529                // on first update previous is original location, on recovery/replay it may be the updated location
1530                if(!previousKeys.location.equals(location)) {
1531                    sd.locationIndex.remove(tx, previousKeys.location);
1532                }
1533            }
1534            metadata.lastUpdate = location;
1535        } else {
1536            //Add the message if it can't be found
1537            this.updateIndex(tx, command, location);
1538        }
1539    }
1540
1541    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1542        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1543        if (!command.hasSubscriptionKey()) {
1544
1545            // In the queue case we just remove the message from the index..
1546            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1547            if (sequenceId != null) {
1548                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1549                if (keys != null) {
1550                    sd.locationIndex.remove(tx, keys.location);
1551                    decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
1552                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1553                    metadata.lastUpdate = ackLocation;
1554                }  else if (LOG.isDebugEnabled()) {
1555                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1556                }
1557            } else if (LOG.isDebugEnabled()) {
1558                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1559            }
1560        } else {
1561            // In the topic case we need remove the message once it's been acked
1562            // by all the subs
1563            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1564
1565            // Make sure it's a valid message id...
1566            if (sequence != null) {
1567                String subscriptionKey = command.getSubscriptionKey();
1568                if (command.getAck() != UNMATCHED) {
1569                    sd.orderIndex.get(tx, sequence);
1570                    byte priority = sd.orderIndex.lastGetPriority();
1571                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1572                }
1573
1574                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1575                if (keys != null) {
1576                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1577                }
1578                // The following method handles deleting un-referenced messages.
1579                removeAckLocation(command, tx, sd, subscriptionKey, sequence);
1580                metadata.lastUpdate = ackLocation;
1581            } else if (LOG.isDebugEnabled()) {
1582                LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1583            }
1584
1585        }
1586    }
1587
1588    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1589        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1590        if (referenceFileIds == null) {
1591            referenceFileIds = new HashSet<Integer>();
1592            referenceFileIds.add(messageLocation.getDataFileId());
1593            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1594        } else {
1595            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1596            if (!referenceFileIds.contains(id)) {
1597                referenceFileIds.add(id);
1598            }
1599        }
1600    }
1601
1602    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1603        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1604        sd.orderIndex.remove(tx);
1605
1606        sd.locationIndex.clear(tx);
1607        sd.locationIndex.unload(tx);
1608        tx.free(sd.locationIndex.getPageId());
1609
1610        sd.messageIdIndex.clear(tx);
1611        sd.messageIdIndex.unload(tx);
1612        tx.free(sd.messageIdIndex.getPageId());
1613
1614        if (sd.subscriptions != null) {
1615            sd.subscriptions.clear(tx);
1616            sd.subscriptions.unload(tx);
1617            tx.free(sd.subscriptions.getPageId());
1618
1619            sd.subscriptionAcks.clear(tx);
1620            sd.subscriptionAcks.unload(tx);
1621            tx.free(sd.subscriptionAcks.getPageId());
1622
1623            sd.ackPositions.clear(tx);
1624            sd.ackPositions.unload(tx);
1625            tx.free(sd.ackPositions.getHeadPageId());
1626
1627            sd.subLocations.clear(tx);
1628            sd.subLocations.unload(tx);
1629            tx.free(sd.subLocations.getHeadPageId());
1630        }
1631
1632        String key = key(command.getDestination());
1633        storedDestinations.remove(key);
1634        metadata.destinations.remove(tx, key);
1635        clearStoreStats(command.getDestination());
1636        storeCache.remove(key(command.getDestination()));
1637    }
1638
1639    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1640        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1641        final String subscriptionKey = command.getSubscriptionKey();
1642
1643        // If set then we are creating it.. otherwise we are destroying the sub
1644        if (command.hasSubscriptionInfo()) {
1645            Location existing = sd.subLocations.get(tx, subscriptionKey);
1646            if (existing != null && existing.compareTo(location) == 0) {
1647                // replay on recovery, ignore
1648                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1649                return;
1650            }
1651
1652            sd.subscriptions.put(tx, subscriptionKey, command);
1653            sd.subLocations.put(tx, subscriptionKey, location);
1654            long ackLocation=NOT_ACKED;
1655            if (!command.getRetroactive()) {
1656                ackLocation = sd.orderIndex.nextMessageId-1;
1657            } else {
1658                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1659            }
1660            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1661            sd.subscriptionCache.add(subscriptionKey);
1662        } else {
1663            // delete the sub...
1664            sd.subscriptions.remove(tx, subscriptionKey);
1665            sd.subLocations.remove(tx, subscriptionKey);
1666            sd.subscriptionAcks.remove(tx, subscriptionKey);
1667            sd.subscriptionCache.remove(subscriptionKey);
1668            removeAckLocationsForSub(command, tx, sd, subscriptionKey);
1669            MessageStoreSubscriptionStatistics subStats = getSubStats(key(command.getDestination()));
1670            if (subStats != null) {
1671                subStats.removeSubscription(subscriptionKey);
1672            }
1673
1674            if (sd.subscriptions.isEmpty(tx)) {
1675                // remove the stored destination
1676                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1677                removeDestinationCommand.setDestination(command.getDestination());
1678                updateIndex(tx, removeDestinationCommand, null);
1679                clearStoreStats(command.getDestination());
1680            }
1681        }
1682    }
1683
1684    private void checkpointUpdate(final boolean cleanup) throws IOException {
1685        checkpointLock.writeLock().lock();
1686        try {
1687            this.indexLock.writeLock().lock();
1688            try {
1689                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1690                    @Override
1691                    public void execute(Transaction tx) throws IOException {
1692                        checkpointUpdate(tx, cleanup);
1693                    }
1694                });
1695            } finally {
1696                this.indexLock.writeLock().unlock();
1697            }
1698
1699        } finally {
1700            checkpointLock.writeLock().unlock();
1701        }
1702    }
1703
1704    /**
1705     * @param tx
1706     * @throws IOException
1707     */
1708    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1709        MDC.put("activemq.persistenceDir", getDirectory().getName());
1710        LOG.debug("Checkpoint started.");
1711
1712        // reflect last update exclusive of current checkpoint
1713        Location lastUpdate = metadata.lastUpdate;
1714
1715        metadata.state = OPEN_STATE;
1716        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1717        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1718        Location[] inProgressTxRange = getInProgressTxLocationRange();
1719        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1720        tx.store(metadata.page, metadataMarshaller, true);
1721        pageFile.flush();
1722
1723        if (cleanup) {
1724
1725            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1726            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1727
1728            if (LOG.isTraceEnabled()) {
1729                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1730            }
1731
1732            if (lastUpdate != null) {
1733                gcCandidateSet.remove(lastUpdate.getDataFileId());
1734            }
1735
1736            // Don't GC files under replication
1737            if( journalFilesBeingReplicated!=null ) {
1738                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1739            }
1740
1741            if (metadata.producerSequenceIdTrackerLocation != null) {
1742                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1743                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1744                    // rewrite so we don't prevent gc
1745                    metadata.producerSequenceIdTracker.setModified(true);
1746                    if (LOG.isTraceEnabled()) {
1747                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1748                    }
1749                }
1750                gcCandidateSet.remove(dataFileId);
1751                if (LOG.isTraceEnabled()) {
1752                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet);
1753                }
1754            }
1755
1756            if (metadata.ackMessageFileMapLocation != null) {
1757                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1758                gcCandidateSet.remove(dataFileId);
1759                if (LOG.isTraceEnabled()) {
1760                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet);
1761                }
1762            }
1763
1764            // Don't GC files referenced by in-progress tx
1765            if (inProgressTxRange[0] != null) {
1766                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1767                    gcCandidateSet.remove(pendingTx);
1768                }
1769            }
1770            if (LOG.isTraceEnabled()) {
1771                LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1772            }
1773
1774            // Go through all the destinations to see if any of them can remove GC candidates.
1775            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1776                if( gcCandidateSet.isEmpty() ) {
1777                    break;
1778                }
1779
1780                // Use a visitor to cut down the number of pages that we load
1781                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1782                    int last=-1;
1783                    @Override
1784                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1785                        if( first==null ) {
1786                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1787                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1788                                subset.remove(second.getDataFileId());
1789                            }
1790                            return !subset.isEmpty();
1791                        } else if( second==null ) {
1792                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1793                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1794                                subset.remove(first.getDataFileId());
1795                            }
1796                            return !subset.isEmpty();
1797                        } else {
1798                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1799                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1800                                subset.remove(first.getDataFileId());
1801                            }
1802                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1803                                subset.remove(second.getDataFileId());
1804                            }
1805                            return !subset.isEmpty();
1806                        }
1807                    }
1808
1809                    @Override
1810                    public void visit(List<Location> keys, List<Long> values) {
1811                        for (Location l : keys) {
1812                            int fileId = l.getDataFileId();
1813                            if( last != fileId ) {
1814                                gcCandidateSet.remove(fileId);
1815                                last = fileId;
1816                            }
1817                        }
1818                    }
1819                });
1820
1821                // Durable Subscription
1822                if (entry.getValue().subLocations != null) {
1823                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1824                    while (iter.hasNext()) {
1825                        Entry<String, Location> subscription = iter.next();
1826                        int dataFileId = subscription.getValue().getDataFileId();
1827
1828                        // Move subscription along if it has no outstanding messages that need ack'd
1829                        // and its in the last log file in the journal.
1830                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1831                            final StoredDestination destination = entry.getValue();
1832                            final String subscriptionKey = subscription.getKey();
1833                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1834
1835                            // When pending is size one that is the next message Id meaning there
1836                            // are no pending messages currently.
1837                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1838                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1839
1840                                if (LOG.isTraceEnabled()) {
1841                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
1842                                }
1843
1844                                final KahaSubscriptionCommand kahaSub =
1845                                    destination.subscriptions.get(tx, subscriptionKey);
1846                                destination.subLocations.put(
1847                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1848
1849                                // Skips the remove from candidates if we rewrote the subscription
1850                                // in order to prevent duplicate subscription commands on recover.
1851                                // If another subscription is on the same file and isn't rewritten
1852                                // than it will remove the file from the set.
1853                                continue;
1854                            }
1855                        }
1856
1857                        gcCandidateSet.remove(dataFileId);
1858                    }
1859                }
1860
1861                if (LOG.isTraceEnabled()) {
1862                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1863                }
1864            }
1865
1866            // check we are not deleting file with ack for in-use journal files
1867            if (LOG.isTraceEnabled()) {
1868                LOG.trace("gc candidates: " + gcCandidateSet);
1869                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1870            }
1871
1872            boolean ackMessageFileMapMod = false;
1873            Iterator<Integer> candidates = gcCandidateSet.iterator();
1874            while (candidates.hasNext()) {
1875                Integer candidate = candidates.next();
1876                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1877                if (referencedFileIds != null) {
1878                    for (Integer referencedFileId : referencedFileIds) {
1879                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1880                            // active file that is not targeted for deletion is referenced so don't delete
1881                            candidates.remove();
1882                            break;
1883                        }
1884                    }
1885                    if (gcCandidateSet.contains(candidate)) {
1886                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1887                    } else {
1888                        if (LOG.isTraceEnabled()) {
1889                            LOG.trace("not removing data file: " + candidate
1890                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1891                        }
1892                    }
1893                }
1894            }
1895
1896            if (!gcCandidateSet.isEmpty()) {
1897                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
1898                journal.removeDataFiles(gcCandidateSet);
1899                for (Integer candidate : gcCandidateSet) {
1900                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1901                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1902                    }
1903                }
1904                if (ackMessageFileMapMod) {
1905                    checkpointUpdate(tx, false);
1906                }
1907            } else if (isEnableAckCompaction()) {
1908                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
1909                    // First check length of journal to make sure it makes sense to even try.
1910                    //
1911                    // If there is only one journal file with Acks in it we don't need to move
1912                    // it since it won't be chained to any later logs.
1913                    //
1914                    // If the logs haven't grown since the last time then we need to compact
1915                    // otherwise there seems to still be room for growth and we don't need to incur
1916                    // the overhead.  Depending on configuration this check can be avoided and
1917                    // Ack compaction will run any time the store has not GC'd a journal file in
1918                    // the configured amount of cycles.
1919                    if (metadata.ackMessageFileMap.size() > 1 &&
1920                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
1921
1922                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
1923                        try {
1924                            scheduler.execute(new AckCompactionRunner());
1925                        } catch (Exception ex) {
1926                            LOG.warn("Error on queueing the Ack Compactor", ex);
1927                        }
1928                    } else {
1929                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
1930                    }
1931
1932                    checkPointCyclesWithNoGC = 0;
1933                } else {
1934                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
1935                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
1936                }
1937
1938                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
1939            }
1940        }
1941        MDC.remove("activemq.persistenceDir");
1942
1943        LOG.debug("Checkpoint done.");
1944    }
1945
1946    private final class AckCompactionRunner implements Runnable {
1947
1948        @Override
1949        public void run() {
1950
1951            int journalToAdvance = -1;
1952            Set<Integer> journalLogsReferenced = new HashSet<Integer>();
1953
1954            //flag to know whether the ack forwarding completed without an exception
1955            boolean forwarded = false;
1956
1957            try {
1958                //acquire the checkpoint lock to prevent other threads from
1959                //running a checkpoint while this is running
1960                //
1961                //Normally this task runs on the same executor as the checkpoint task
1962                //so this ack compaction runner wouldn't run at the same time as the checkpoint task.
1963                //
1964                //However, there are two cases where this isn't always true.
1965                //First, the checkpoint() method is public and can be called through the
1966                //PersistenceAdapter interface by someone at the same time this is running.
1967                //Second, a checkpoint is called during shutdown without using the executor.
1968                //
1969                //In the future it might be better to just remove the checkpointLock entirely
1970                //and only use the executor but this would need to be examined for any unintended
1971                //consequences
1972                checkpointLock.readLock().lock();
1973
1974                try {
1975
1976                    // Lock index to capture the ackMessageFileMap data
1977                    indexLock.writeLock().lock();
1978
1979                    // Map keys might not be sorted, find the earliest log file to forward acks
1980                    // from and move only those, future cycles can chip away at more as needed.
1981                    // We won't move files that are themselves rewritten on a previous compaction.
1982                    List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
1983                    Collections.sort(journalFileIds);
1984                    for (Integer journalFileId : journalFileIds) {
1985                        DataFile current = journal.getDataFileById(journalFileId);
1986                        if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
1987                            journalToAdvance = journalFileId;
1988                            break;
1989                        }
1990                    }
1991
1992                    // Check if we found one, or if we only found the current file being written to.
1993                    if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
1994                        return;
1995                    }
1996
1997                    journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
1998
1999                } finally {
2000                    indexLock.writeLock().unlock();
2001                }
2002
2003                try {
2004                    // Background rewrite of the old acks
2005                    forwardAllAcks(journalToAdvance, journalLogsReferenced);
2006                    forwarded = true;
2007                } catch (IOException ioe) {
2008                    LOG.error("Forwarding of acks failed", ioe);
2009                    brokerService.handleIOException(ioe);
2010                } catch (Throwable e) {
2011                    LOG.error("Forwarding of acks failed", e);
2012                    brokerService.handleIOException(IOExceptionSupport.create(e));
2013                }
2014            } finally {
2015                checkpointLock.readLock().unlock();
2016            }
2017
2018            try {
2019                if (forwarded) {
2020                    // Checkpoint with changes from the ackMessageFileMap
2021                    checkpointUpdate(false);
2022                }
2023            } catch (IOException ioe) {
2024                LOG.error("Checkpoint failed", ioe);
2025                brokerService.handleIOException(ioe);
2026            } catch (Throwable e) {
2027                LOG.error("Checkpoint failed", e);
2028                brokerService.handleIOException(IOExceptionSupport.create(e));
2029            }
2030        }
2031    }
2032
2033    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
2034        LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
2035
2036        DataFile forwardsFile = journal.reserveDataFile();
2037        forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
2038        LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
2039
2040        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
2041
2042        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
2043            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
2044            compactionMarker.setSourceDataFileId(journalToRead);
2045            compactionMarker.setRewriteType(forwardsFile.getTypeCode());
2046
2047            ByteSequence payload = toByteSequence(compactionMarker);
2048            appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2049            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
2050
2051            Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0));
2052            while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) {
2053                JournalCommand<?> command = null;
2054                try {
2055                    command = load(nextLocation);
2056                } catch (IOException ex) {
2057                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
2058                }
2059
2060                if (command != null && command instanceof KahaRemoveMessageCommand) {
2061                    payload = toByteSequence(command);
2062                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2063                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
2064                }
2065
2066                nextLocation = getNextLocationForAckForward(nextLocation);
2067            }
2068        }
2069
2070        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
2071
2072        // Lock index while we update the ackMessageFileMap.
2073        indexLock.writeLock().lock();
2074
2075        // Update the ack map with the new locations of the acks
2076        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
2077            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
2078            if (referenceFileIds == null) {
2079                referenceFileIds = new HashSet<Integer>();
2080                referenceFileIds.addAll(entry.getValue());
2081                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
2082            } else {
2083                referenceFileIds.addAll(entry.getValue());
2084            }
2085        }
2086
2087        // remove the old location data from the ack map so that the old journal log file can
2088        // be removed on next GC.
2089        metadata.ackMessageFileMap.remove(journalToRead);
2090
2091        indexLock.writeLock().unlock();
2092
2093        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
2094    }
2095
2096    private Location getNextLocationForAckForward(final Location nextLocation) {
2097        //getNextLocation() can throw an IOException, we should handle it and set
2098        //nextLocation to null and abort gracefully
2099        //Should not happen in the normal case
2100        Location location = null;
2101        try {
2102            location = journal.getNextLocation(nextLocation);
2103        } catch (IOException e) {
2104            LOG.warn("Failed to load next journal location: {}", e.getMessage());
2105            if (LOG.isDebugEnabled()) {
2106                LOG.debug("Failed to load next journal location", e);
2107            }
2108        }
2109        return location;
2110    }
2111
2112    final Runnable nullCompletionCallback = new Runnable() {
2113        @Override
2114        public void run() {
2115        }
2116    };
2117
2118    private Location checkpointProducerAudit() throws IOException {
2119        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
2120            ByteArrayOutputStream baos = new ByteArrayOutputStream();
2121            ObjectOutputStream oout = new ObjectOutputStream(baos);
2122            oout.writeObject(metadata.producerSequenceIdTracker);
2123            oout.flush();
2124            oout.close();
2125            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2126            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
2127            try {
2128                location.getLatch().await();
2129            } catch (InterruptedException e) {
2130                throw new InterruptedIOException(e.toString());
2131            }
2132            return location;
2133        }
2134        return metadata.producerSequenceIdTrackerLocation;
2135    }
2136
2137    private Location checkpointAckMessageFileMap() throws IOException {
2138        ByteArrayOutputStream baos = new ByteArrayOutputStream();
2139        ObjectOutputStream oout = new ObjectOutputStream(baos);
2140        oout.writeObject(metadata.ackMessageFileMap);
2141        oout.flush();
2142        oout.close();
2143        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2144        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
2145        try {
2146            location.getLatch().await();
2147        } catch (InterruptedException e) {
2148            throw new InterruptedIOException(e.toString());
2149        }
2150        return location;
2151    }
2152
2153    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
2154
2155        ByteSequence sequence = toByteSequence(subscription);
2156        Location location = journal.write(sequence, nullCompletionCallback) ;
2157
2158        try {
2159            location.getLatch().await();
2160        } catch (InterruptedException e) {
2161            throw new InterruptedIOException(e.toString());
2162        }
2163        return location;
2164    }
2165
2166    public HashSet<Integer> getJournalFilesBeingReplicated() {
2167        return journalFilesBeingReplicated;
2168    }
2169
2170    // /////////////////////////////////////////////////////////////////
2171    // StoredDestination related implementation methods.
2172    // /////////////////////////////////////////////////////////////////
2173
2174    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
2175
2176    static class MessageKeys {
2177        final String messageId;
2178        final Location location;
2179
2180        public MessageKeys(String messageId, Location location) {
2181            this.messageId=messageId;
2182            this.location=location;
2183        }
2184
2185        @Override
2186        public String toString() {
2187            return "["+messageId+","+location+"]";
2188        }
2189    }
2190
2191    protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
2192        final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller();
2193
2194        @Override
2195        public MessageKeys readPayload(DataInput dataIn) throws IOException {
2196            return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn));
2197        }
2198
2199        @Override
2200        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
2201            dataOut.writeUTF(object.messageId);
2202            locationSizeMarshaller.writePayload(object.location, dataOut);
2203        }
2204    }
2205
2206    class LastAck {
2207        long lastAckedSequence;
2208        byte priority;
2209
2210        public LastAck(LastAck source) {
2211            this.lastAckedSequence = source.lastAckedSequence;
2212            this.priority = source.priority;
2213        }
2214
2215        public LastAck() {
2216            this.priority = MessageOrderIndex.HI;
2217        }
2218
2219        public LastAck(long ackLocation) {
2220            this.lastAckedSequence = ackLocation;
2221            this.priority = MessageOrderIndex.LO;
2222        }
2223
2224        public LastAck(long ackLocation, byte priority) {
2225            this.lastAckedSequence = ackLocation;
2226            this.priority = priority;
2227        }
2228
2229        @Override
2230        public String toString() {
2231            return "[" + lastAckedSequence + ":" + priority + "]";
2232        }
2233    }
2234
2235    protected class LastAckMarshaller implements Marshaller<LastAck> {
2236
2237        @Override
2238        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
2239            dataOut.writeLong(object.lastAckedSequence);
2240            dataOut.writeByte(object.priority);
2241        }
2242
2243        @Override
2244        public LastAck readPayload(DataInput dataIn) throws IOException {
2245            LastAck lastAcked = new LastAck();
2246            lastAcked.lastAckedSequence = dataIn.readLong();
2247            if (metadata.version >= 3) {
2248                lastAcked.priority = dataIn.readByte();
2249            }
2250            return lastAcked;
2251        }
2252
2253        @Override
2254        public int getFixedSize() {
2255            return 9;
2256        }
2257
2258        @Override
2259        public LastAck deepCopy(LastAck source) {
2260            return new LastAck(source);
2261        }
2262
2263        @Override
2264        public boolean isDeepCopySupported() {
2265            return true;
2266        }
2267    }
2268
2269    class StoredDestination {
2270
2271        MessageOrderIndex orderIndex = new MessageOrderIndex();
2272        BTreeIndex<Location, Long> locationIndex;
2273        BTreeIndex<String, Long> messageIdIndex;
2274
2275        // These bits are only set for Topics
2276        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
2277        BTreeIndex<String, LastAck> subscriptionAcks;
2278        HashMap<String, MessageOrderCursor> subscriptionCursors;
2279        ListIndex<String, SequenceSet> ackPositions;
2280        ListIndex<String, Location> subLocations;
2281
2282        // Transient data used to track which Messages are no longer needed.
2283        final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
2284        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
2285
2286        public void trackPendingAdd(Long seq) {
2287            orderIndex.trackPendingAdd(seq);
2288        }
2289
2290        public void trackPendingAddComplete(Long seq) {
2291            orderIndex.trackPendingAddComplete(seq);
2292        }
2293
2294        @Override
2295        public String toString() {
2296            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
2297        }
2298    }
2299
2300    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
2301
2302        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
2303
2304        @Override
2305        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
2306            final StoredDestination value = new StoredDestination();
2307            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2308            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
2309            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
2310
2311            if (dataIn.readBoolean()) {
2312                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
2313                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
2314                if (metadata.version >= 4) {
2315                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
2316                } else {
2317                    // upgrade
2318                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2319                        @Override
2320                        public void execute(Transaction tx) throws IOException {
2321                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
2322
2323                            if (metadata.version >= 3) {
2324                                // migrate
2325                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2326                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
2327                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2328                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2329                                oldAckPositions.load(tx);
2330
2331
2332                                // Do the initial build of the data in memory before writing into the store
2333                                // based Ack Positions List to avoid a lot of disk thrashing.
2334                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2335                                while (iterator.hasNext()) {
2336                                    Entry<Long, HashSet<String>> entry = iterator.next();
2337
2338                                    for(String subKey : entry.getValue()) {
2339                                        SequenceSet pendingAcks = temp.get(subKey);
2340                                        if (pendingAcks == null) {
2341                                            pendingAcks = new SequenceSet();
2342                                            temp.put(subKey, pendingAcks);
2343                                        }
2344
2345                                        pendingAcks.add(entry.getKey());
2346                                    }
2347                                }
2348                            }
2349                            // Now move the pending messages to ack data into the store backed
2350                            // structure.
2351                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2352                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2353                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2354                            value.ackPositions.load(tx);
2355                            for(String subscriptionKey : temp.keySet()) {
2356                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2357                            }
2358
2359                        }
2360                    });
2361                }
2362
2363                if (metadata.version >= 5) {
2364                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
2365                } else {
2366                    // upgrade
2367                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2368                        @Override
2369                        public void execute(Transaction tx) throws IOException {
2370                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2371                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2372                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2373                            value.subLocations.load(tx);
2374                        }
2375                    });
2376                }
2377            }
2378            if (metadata.version >= 2) {
2379                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2380                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2381            } else {
2382                // upgrade
2383                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2384                    @Override
2385                    public void execute(Transaction tx) throws IOException {
2386                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2387                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2388                        value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2389                        value.orderIndex.lowPriorityIndex.load(tx);
2390
2391                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2392                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2393                        value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2394                        value.orderIndex.highPriorityIndex.load(tx);
2395                    }
2396                });
2397            }
2398
2399            return value;
2400        }
2401
2402        @Override
2403        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2404            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2405            dataOut.writeLong(value.locationIndex.getPageId());
2406            dataOut.writeLong(value.messageIdIndex.getPageId());
2407            if (value.subscriptions != null) {
2408                dataOut.writeBoolean(true);
2409                dataOut.writeLong(value.subscriptions.getPageId());
2410                dataOut.writeLong(value.subscriptionAcks.getPageId());
2411                dataOut.writeLong(value.ackPositions.getHeadPageId());
2412                dataOut.writeLong(value.subLocations.getHeadPageId());
2413            } else {
2414                dataOut.writeBoolean(false);
2415            }
2416            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2417            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2418        }
2419    }
2420
2421    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2422        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2423
2424        @Override
2425        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2426            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2427            rc.mergeFramed((InputStream)dataIn);
2428            return rc;
2429        }
2430
2431        @Override
2432        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2433            object.writeFramed((OutputStream)dataOut);
2434        }
2435    }
2436
2437    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2438        String key = key(destination);
2439        StoredDestination rc = storedDestinations.get(key);
2440        if (rc == null) {
2441            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2442            rc = loadStoredDestination(tx, key, topic);
2443            // Cache it. We may want to remove/unload destinations from the
2444            // cache that are not used for a while
2445            // to reduce memory usage.
2446            storedDestinations.put(key, rc);
2447        }
2448        return rc;
2449    }
2450
2451    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2452        String key = key(destination);
2453        StoredDestination rc = storedDestinations.get(key);
2454        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2455            rc = getStoredDestination(destination, tx);
2456        }
2457        return rc;
2458    }
2459
2460    /**
2461     * @param tx
2462     * @param key
2463     * @param topic
2464     * @return
2465     * @throws IOException
2466     */
2467    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2468        // Try to load the existing indexes..
2469        StoredDestination rc = metadata.destinations.get(tx, key);
2470        if (rc == null) {
2471            // Brand new destination.. allocate indexes for it.
2472            rc = new StoredDestination();
2473            rc.orderIndex.allocate(tx);
2474            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
2475            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
2476
2477            if (topic) {
2478                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
2479                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
2480                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2481                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2482            }
2483            metadata.destinations.put(tx, key, rc);
2484        }
2485
2486        // Configure the marshalers and load.
2487        rc.orderIndex.load(tx);
2488
2489        // Figure out the next key using the last entry in the destination.
2490        rc.orderIndex.configureLast(tx);
2491
2492        rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
2493        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2494        rc.locationIndex.load(tx);
2495
2496        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2497        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2498        rc.messageIdIndex.load(tx);
2499
2500        //go through an upgrade old index if older than version 6
2501        if (metadata.version < 6) {
2502            for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) {
2503                Entry<Location, Long> entry = iterator.next();
2504                // modify so it is upgraded
2505                rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
2506            }
2507            //upgrade the order index
2508            for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) {
2509                Entry<Long, MessageKeys> entry = iterator.next();
2510                //call get so that the last priority is updated
2511                rc.orderIndex.get(tx, entry.getKey());
2512                rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue());
2513            }
2514        }
2515
2516        // If it was a topic...
2517        if (topic) {
2518
2519            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2520            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2521            rc.subscriptions.load(tx);
2522
2523            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2524            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2525            rc.subscriptionAcks.load(tx);
2526
2527            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2528            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2529            rc.ackPositions.load(tx);
2530
2531            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2532            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2533            rc.subLocations.load(tx);
2534
2535            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
2536
2537            if (metadata.version < 3) {
2538
2539                // on upgrade need to fill ackLocation with available messages past last ack
2540                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2541                    Entry<String, LastAck> entry = iterator.next();
2542                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2543                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2544                        Long sequence = orderIterator.next().getKey();
2545                        addAckLocation(tx, rc, sequence, entry.getKey());
2546                    }
2547                    // modify so it is upgraded
2548                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2549                }
2550            }
2551
2552            // Configure the message references index
2553            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2554            while (subscriptions.hasNext()) {
2555                Entry<String, SequenceSet> subscription = subscriptions.next();
2556                SequenceSet pendingAcks = subscription.getValue();
2557                if (pendingAcks != null && !pendingAcks.isEmpty()) {
2558                    Long lastPendingAck = pendingAcks.getTail().getLast();
2559                    for (Long sequenceId : pendingAcks) {
2560                        Long current = rc.messageReferences.get(sequenceId);
2561                        if (current == null) {
2562                            current = new Long(0);
2563                        }
2564
2565                        // We always add a trailing empty entry for the next position to start from
2566                        // so we need to ensure we don't count that as a message reference on reload.
2567                        if (!sequenceId.equals(lastPendingAck)) {
2568                            current = current.longValue() + 1;
2569                        } else {
2570                            current = Long.valueOf(0L);
2571                        }
2572
2573                        rc.messageReferences.put(sequenceId, current);
2574                    }
2575                }
2576            }
2577
2578            // Configure the subscription cache
2579            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2580                Entry<String, LastAck> entry = iterator.next();
2581                rc.subscriptionCache.add(entry.getKey());
2582            }
2583
2584            if (rc.orderIndex.nextMessageId == 0) {
2585                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2586                if (!rc.subscriptionAcks.isEmpty(tx)) {
2587                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2588                        Entry<String, LastAck> entry = iterator.next();
2589                        rc.orderIndex.nextMessageId =
2590                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2591                    }
2592                }
2593            } else {
2594                // update based on ackPositions for unmatched, last entry is always the next
2595                if (!rc.messageReferences.isEmpty()) {
2596                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
2597                    rc.orderIndex.nextMessageId =
2598                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
2599                }
2600            }
2601        }
2602
2603        if (metadata.version < VERSION) {
2604            // store again after upgrade
2605            metadata.destinations.put(tx, key, rc);
2606        }
2607        return rc;
2608    }
2609
2610    /**
2611     * Clear the counter for the destination, if one exists.
2612     *
2613     * @param kahaDestination
2614     */
2615    protected void clearStoreStats(KahaDestination kahaDestination) {
2616        String key = key(kahaDestination);
2617        MessageStoreStatistics storeStats = getStoreStats(key);
2618        MessageStoreSubscriptionStatistics subStats = getSubStats(key);
2619        if (storeStats != null) {
2620            storeStats.reset();
2621        }
2622        if (subStats != null) {
2623            subStats.reset();
2624        }
2625    }
2626
2627    /**
2628     * Update MessageStoreStatistics
2629     *
2630     * @param kahaDestination
2631     * @param size
2632     */
2633    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
2634        incrementAndAddSizeToStoreStat(key(kahaDestination), size);
2635    }
2636
2637    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
2638        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2639        if (storeStats != null) {
2640            storeStats.getMessageCount().increment();
2641            if (size > 0) {
2642                storeStats.getMessageSize().addSize(size);
2643            }
2644        }
2645    }
2646
2647    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
2648        decrementAndSubSizeToStoreStat(key(kahaDestination), size);
2649    }
2650
2651    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
2652        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2653        if (storeStats != null) {
2654            storeStats.getMessageCount().decrement();
2655            if (size > 0) {
2656                storeStats.getMessageSize().addSize(-size);
2657            }
2658        }
2659    }
2660
2661    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) {
2662        incrementAndAddSizeToStoreStat(key(kahaDestination), subKey, size);
2663    }
2664
2665    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, String subKey, long size) {
2666        if (enableSubscriptionStatistics) {
2667            MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey);
2668            if (subStats != null && subKey != null) {
2669                subStats.getMessageCount(subKey).increment();
2670                if (size > 0) {
2671                    subStats.getMessageSize(subKey).addSize(size);
2672                }
2673            }
2674        }
2675    }
2676
2677
2678    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, String subKey, long size) {
2679        if (enableSubscriptionStatistics) {
2680            MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey);
2681            if (subStats != null && subKey != null) {
2682                subStats.getMessageCount(subKey).decrement();
2683                if (size > 0) {
2684                    subStats.getMessageSize(subKey).addSize(-size);
2685                }
2686            }
2687        }
2688    }
2689
2690    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) {
2691        decrementAndSubSizeToStoreStat(key(kahaDestination), subKey, size);
2692    }
2693
2694    /**
2695     * This is a map to cache MessageStores for a specific
2696     * KahaDestination key
2697     */
2698    protected final ConcurrentMap<String, MessageStore> storeCache =
2699            new ConcurrentHashMap<String, MessageStore>();
2700
2701    /**
2702     * Locate the storeMessageSize counter for this KahaDestination
2703     * @param kahaDestination
2704     * @return
2705     */
2706    protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
2707        MessageStoreStatistics storeStats = null;
2708        try {
2709            MessageStore messageStore = storeCache.get(kahaDestKey);
2710            if (messageStore != null) {
2711                storeStats = messageStore.getMessageStoreStatistics();
2712            }
2713        } catch (Exception e1) {
2714             LOG.error("Getting size counter of destination failed", e1);
2715        }
2716
2717        return storeStats;
2718    }
2719
2720    protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey) {
2721        MessageStoreSubscriptionStatistics subStats = null;
2722        try {
2723            MessageStore messageStore = storeCache.get(kahaDestKey);
2724            if (messageStore instanceof TopicMessageStore) {
2725                subStats = ((TopicMessageStore)messageStore).getMessageStoreSubStatistics();
2726            }
2727        } catch (Exception e1) {
2728             LOG.error("Getting size counter of destination failed", e1);
2729        }
2730
2731        return subStats;
2732    }
2733
2734    /**
2735     * Determine whether this Destination matches the DestinationType
2736     *
2737     * @param destination
2738     * @param type
2739     * @return
2740     */
2741    protected boolean matchType(Destination destination,
2742            KahaDestination.DestinationType type) {
2743        if (destination instanceof Topic
2744                && type.equals(KahaDestination.DestinationType.TOPIC)) {
2745            return true;
2746        } else if (destination instanceof Queue
2747                && type.equals(KahaDestination.DestinationType.QUEUE)) {
2748            return true;
2749        }
2750        return false;
2751    }
2752
2753    class LocationSizeMarshaller implements Marshaller<Location> {
2754
2755        public LocationSizeMarshaller() {
2756
2757        }
2758
2759        @Override
2760        public Location readPayload(DataInput dataIn) throws IOException {
2761            Location rc = new Location();
2762            rc.setDataFileId(dataIn.readInt());
2763            rc.setOffset(dataIn.readInt());
2764            if (metadata.version >= 6) {
2765                rc.setSize(dataIn.readInt());
2766            }
2767            return rc;
2768        }
2769
2770        @Override
2771        public void writePayload(Location object, DataOutput dataOut)
2772                throws IOException {
2773            dataOut.writeInt(object.getDataFileId());
2774            dataOut.writeInt(object.getOffset());
2775            dataOut.writeInt(object.getSize());
2776        }
2777
2778        @Override
2779        public int getFixedSize() {
2780            return 12;
2781        }
2782
2783        @Override
2784        public Location deepCopy(Location source) {
2785            return new Location(source);
2786        }
2787
2788        @Override
2789        public boolean isDeepCopySupported() {
2790            return true;
2791        }
2792    }
2793
2794    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2795        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2796        if (sequences == null) {
2797            sequences = new SequenceSet();
2798            sequences.add(messageSequence);
2799            sd.ackPositions.add(tx, subscriptionKey, sequences);
2800        } else {
2801            sequences.add(messageSequence);
2802            sd.ackPositions.put(tx, subscriptionKey, sequences);
2803        }
2804
2805        Long count = sd.messageReferences.get(messageSequence);
2806        if (count == null) {
2807            count = Long.valueOf(0L);
2808        }
2809        count = count.longValue() + 1;
2810        sd.messageReferences.put(messageSequence, count);
2811    }
2812
2813    // new sub is interested in potentially all existing messages
2814    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2815        SequenceSet allOutstanding = new SequenceSet();
2816        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2817        while (iterator.hasNext()) {
2818            SequenceSet set = iterator.next().getValue();
2819            for (Long entry : set) {
2820                allOutstanding.add(entry);
2821            }
2822        }
2823        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2824
2825        for (Long ackPosition : allOutstanding) {
2826            Long count = sd.messageReferences.get(ackPosition);
2827
2828            // There might not be a reference if the ackLocation was the last
2829            // one which is a placeholder for the next incoming message and
2830            // no value was added to the message references table.
2831            if (count != null) {
2832                count = count.longValue() + 1;
2833                sd.messageReferences.put(ackPosition, count);
2834            }
2835        }
2836    }
2837
2838    // on a new message add, all existing subs are interested in this message
2839    private void addAckLocationForNewMessage(Transaction tx, KahaDestination kahaDest,
2840            StoredDestination sd, Long messageSequence) throws IOException {
2841        for(String subscriptionKey : sd.subscriptionCache) {
2842            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2843            if (sequences == null) {
2844                sequences = new SequenceSet();
2845                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2846                sd.ackPositions.add(tx, subscriptionKey, sequences);
2847            } else {
2848                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2849                sd.ackPositions.put(tx, subscriptionKey, sequences);
2850            }
2851
2852            MessageKeys key = sd.orderIndex.get(tx, messageSequence);
2853            incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey,
2854                    key.location.getSize());
2855
2856            Long count = sd.messageReferences.get(messageSequence);
2857            if (count == null) {
2858                count = Long.valueOf(0L);
2859            }
2860            count = count.longValue() + 1;
2861            sd.messageReferences.put(messageSequence, count);
2862            sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
2863        }
2864    }
2865
2866    private void removeAckLocationsForSub(KahaSubscriptionCommand command,
2867            Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2868        if (!sd.ackPositions.isEmpty(tx)) {
2869            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2870            if (sequences == null || sequences.isEmpty()) {
2871                return;
2872            }
2873
2874            ArrayList<Long> unreferenced = new ArrayList<Long>();
2875
2876            for(Long sequenceId : sequences) {
2877                Long references = sd.messageReferences.get(sequenceId);
2878                if (references != null) {
2879                    references = references.longValue() - 1;
2880
2881                    if (references.longValue() > 0) {
2882                        sd.messageReferences.put(sequenceId, references);
2883                    } else {
2884                        sd.messageReferences.remove(sequenceId);
2885                        unreferenced.add(sequenceId);
2886                    }
2887                }
2888            }
2889
2890            for(Long sequenceId : unreferenced) {
2891                // Find all the entries that need to get deleted.
2892                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2893                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2894
2895                // Do the actual deletes.
2896                for (Entry<Long, MessageKeys> entry : deletes) {
2897                    sd.locationIndex.remove(tx, entry.getValue().location);
2898                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2899                    sd.orderIndex.remove(tx, entry.getKey());
2900                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2901                }
2902            }
2903        }
2904    }
2905
2906    /**
2907     * @param tx
2908     * @param sd
2909     * @param subscriptionKey
2910     * @param messageSequence
2911     * @throws IOException
2912     */
2913    private void removeAckLocation(KahaRemoveMessageCommand command,
2914            Transaction tx, StoredDestination sd, String subscriptionKey,
2915            Long messageSequence) throws IOException {
2916        // Remove the sub from the previous location set..
2917        if (messageSequence != null) {
2918            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2919            if (range != null && !range.isEmpty()) {
2920                range.remove(messageSequence);
2921                if (!range.isEmpty()) {
2922                    sd.ackPositions.put(tx, subscriptionKey, range);
2923                } else {
2924                    sd.ackPositions.remove(tx, subscriptionKey);
2925                }
2926
2927                MessageKeys key = sd.orderIndex.get(tx, messageSequence);
2928                decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey,
2929                        key.location.getSize());
2930
2931                // Check if the message is reference by any other subscription.
2932                Long count = sd.messageReferences.get(messageSequence);
2933                if (count != null) {
2934                    long references = count.longValue() - 1;
2935                    if (references > 0) {
2936                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
2937                        return;
2938                    } else {
2939                        sd.messageReferences.remove(messageSequence);
2940                    }
2941                }
2942
2943                // Find all the entries that need to get deleted.
2944                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2945                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2946
2947                // Do the actual deletes.
2948                for (Entry<Long, MessageKeys> entry : deletes) {
2949                    sd.locationIndex.remove(tx, entry.getValue().location);
2950                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2951                    sd.orderIndex.remove(tx, entry.getKey());
2952                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2953                }
2954            }
2955        }
2956    }
2957
2958    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2959        return sd.subscriptionAcks.get(tx, subscriptionKey);
2960    }
2961
2962    public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2963        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2964        if (messageSequences != null) {
2965            long result = messageSequences.rangeSize();
2966            // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2967            return result > 0 ? result - 1 : 0;
2968        }
2969
2970        return 0;
2971    }
2972
2973    public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2974        //grab the messages attached to this subscription
2975        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2976
2977        long locationSize = 0;
2978        if (messageSequences != null) {
2979            Sequence head = messageSequences.getHead();
2980            if (head != null) {
2981                //get an iterator over the order index starting at the first unacked message
2982                //and go over each message to add up the size
2983                Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
2984                        new MessageOrderCursor(head.getFirst()));
2985
2986                while (iterator.hasNext()) {
2987                    Entry<Long, MessageKeys> entry = iterator.next();
2988                    locationSize += entry.getValue().location.getSize();
2989                }
2990            }
2991        }
2992
2993        return locationSize;
2994    }
2995
2996    protected String key(KahaDestination destination) {
2997        return destination.getType().getNumber() + ":" + destination.getName();
2998    }
2999
3000    // /////////////////////////////////////////////////////////////////
3001    // Transaction related implementation methods.
3002    // /////////////////////////////////////////////////////////////////
3003    @SuppressWarnings("rawtypes")
3004    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
3005    @SuppressWarnings("rawtypes")
3006    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
3007    protected final Set<String> ackedAndPrepared = new HashSet<String>();
3008    protected final Set<String> rolledBackAcks = new HashSet<String>();
3009
3010    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
3011    // till then they are skipped by the store.
3012    // 'at most once' XA guarantee
3013    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
3014        this.indexLock.writeLock().lock();
3015        try {
3016            for (MessageAck ack : acks) {
3017                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
3018            }
3019        } finally {
3020            this.indexLock.writeLock().unlock();
3021        }
3022    }
3023
3024    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
3025        if (acks != null) {
3026            this.indexLock.writeLock().lock();
3027            try {
3028                for (MessageAck ack : acks) {
3029                    final String id = ack.getLastMessageId().toProducerKey();
3030                    ackedAndPrepared.remove(id);
3031                    if (rollback) {
3032                        rolledBackAcks.add(id);
3033                    }
3034                }
3035            } finally {
3036                this.indexLock.writeLock().unlock();
3037            }
3038        }
3039    }
3040
3041    @SuppressWarnings("rawtypes")
3042    private List<Operation> getInflightTx(KahaTransactionInfo info) {
3043        TransactionId key = TransactionIdConversion.convert(info);
3044        List<Operation> tx;
3045        synchronized (inflightTransactions) {
3046            tx = inflightTransactions.get(key);
3047            if (tx == null) {
3048                tx = Collections.synchronizedList(new ArrayList<Operation>());
3049                inflightTransactions.put(key, tx);
3050            }
3051        }
3052        return tx;
3053    }
3054
3055    @SuppressWarnings("unused")
3056    private TransactionId key(KahaTransactionInfo transactionInfo) {
3057        return TransactionIdConversion.convert(transactionInfo);
3058    }
3059
3060    abstract class Operation <T extends JournalCommand<T>> {
3061        final T command;
3062        final Location location;
3063
3064        public Operation(T command, Location location) {
3065            this.command = command;
3066            this.location = location;
3067        }
3068
3069        public Location getLocation() {
3070            return location;
3071        }
3072
3073        public T getCommand() {
3074            return command;
3075        }
3076
3077        abstract public void execute(Transaction tx) throws IOException;
3078    }
3079
3080    class AddOperation extends Operation<KahaAddMessageCommand> {
3081        final IndexAware runWithIndexLock;
3082        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
3083            super(command, location);
3084            this.runWithIndexLock = runWithIndexLock;
3085        }
3086
3087        @Override
3088        public void execute(Transaction tx) throws IOException {
3089            long seq = updateIndex(tx, command, location);
3090            if (runWithIndexLock != null) {
3091                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
3092            }
3093        }
3094    }
3095
3096    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
3097
3098        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
3099            super(command, location);
3100        }
3101
3102        @Override
3103        public void execute(Transaction tx) throws IOException {
3104            updateIndex(tx, command, location);
3105        }
3106    }
3107
3108    // /////////////////////////////////////////////////////////////////
3109    // Initialization related implementation methods.
3110    // /////////////////////////////////////////////////////////////////
3111
3112    private PageFile createPageFile() throws IOException {
3113        if (indexDirectory == null) {
3114            indexDirectory = directory;
3115        }
3116        IOHelper.mkdirs(indexDirectory);
3117        PageFile index = new PageFile(indexDirectory, "db");
3118        index.setEnableWriteThread(isEnableIndexWriteAsync());
3119        index.setWriteBatchSize(getIndexWriteBatchSize());
3120        index.setPageCacheSize(indexCacheSize);
3121        index.setUseLFRUEviction(isUseIndexLFRUEviction());
3122        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
3123        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
3124        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
3125        index.setEnablePageCaching(isEnableIndexPageCaching());
3126        return index;
3127    }
3128
3129    private Journal createJournal() throws IOException {
3130        Journal manager = new Journal();
3131        manager.setDirectory(directory);
3132        manager.setMaxFileLength(getJournalMaxFileLength());
3133        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
3134        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
3135        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
3136        manager.setArchiveDataLogs(isArchiveDataLogs());
3137        manager.setSizeAccumulator(journalSize);
3138        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
3139        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
3140        manager.setPreallocationStrategy(
3141                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
3142        manager.setJournalDiskSyncStrategy(
3143                Journal.JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase()));
3144        if (getDirectoryArchive() != null) {
3145            IOHelper.mkdirs(getDirectoryArchive());
3146            manager.setDirectoryArchive(getDirectoryArchive());
3147        }
3148        return manager;
3149    }
3150
3151    private Metadata createMetadata() {
3152        Metadata md = new Metadata();
3153        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
3154        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
3155        return md;
3156    }
3157
3158    protected abstract void configureMetadata();
3159
3160    public int getJournalMaxWriteBatchSize() {
3161        return journalMaxWriteBatchSize;
3162    }
3163
3164    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
3165        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
3166    }
3167
3168    public File getDirectory() {
3169        return directory;
3170    }
3171
3172    public void setDirectory(File directory) {
3173        this.directory = directory;
3174    }
3175
3176    public boolean isDeleteAllMessages() {
3177        return deleteAllMessages;
3178    }
3179
3180    public void setDeleteAllMessages(boolean deleteAllMessages) {
3181        this.deleteAllMessages = deleteAllMessages;
3182    }
3183
3184    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
3185        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
3186    }
3187
3188    public int getIndexWriteBatchSize() {
3189        return setIndexWriteBatchSize;
3190    }
3191
3192    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
3193        this.enableIndexWriteAsync = enableIndexWriteAsync;
3194    }
3195
3196    boolean isEnableIndexWriteAsync() {
3197        return enableIndexWriteAsync;
3198    }
3199
3200    /**
3201     * @deprecated use {@link #getJournalDiskSyncStrategy} instead
3202     * @return
3203     */
3204    public boolean isEnableJournalDiskSyncs() {
3205        return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals(
3206                journalDiskSyncStrategy.trim().toUpperCase());
3207    }
3208
3209    /**
3210     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
3211     * @param syncWrites
3212     */
3213    public void setEnableJournalDiskSyncs(boolean syncWrites) {
3214        if (syncWrites) {
3215            journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
3216        } else {
3217            journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER.name();
3218        }
3219    }
3220
3221    public String getJournalDiskSyncStrategy() {
3222        return journalDiskSyncStrategy;
3223    }
3224
3225    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
3226        this.journalDiskSyncStrategy = journalDiskSyncStrategy;
3227    }
3228
3229    public long getJournalDiskSyncInterval() {
3230        return journalDiskSyncInterval;
3231    }
3232
3233    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
3234        this.journalDiskSyncInterval = journalDiskSyncInterval;
3235    }
3236
3237    public long getCheckpointInterval() {
3238        return checkpointInterval;
3239    }
3240
3241    public void setCheckpointInterval(long checkpointInterval) {
3242        this.checkpointInterval = checkpointInterval;
3243    }
3244
3245    public long getCleanupInterval() {
3246        return cleanupInterval;
3247    }
3248
3249    public void setCleanupInterval(long cleanupInterval) {
3250        this.cleanupInterval = cleanupInterval;
3251    }
3252
3253    public void setJournalMaxFileLength(int journalMaxFileLength) {
3254        this.journalMaxFileLength = journalMaxFileLength;
3255    }
3256
3257    public int getJournalMaxFileLength() {
3258        return journalMaxFileLength;
3259    }
3260
3261    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
3262        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
3263    }
3264
3265    public int getMaxFailoverProducersToTrack() {
3266        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
3267    }
3268
3269    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
3270        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
3271    }
3272
3273    public int getFailoverProducersAuditDepth() {
3274        return this.metadata.producerSequenceIdTracker.getAuditDepth();
3275    }
3276
3277    public PageFile getPageFile() throws IOException {
3278        if (pageFile == null) {
3279            pageFile = createPageFile();
3280        }
3281        return pageFile;
3282    }
3283
3284    public Journal getJournal() throws IOException {
3285        if (journal == null) {
3286            journal = createJournal();
3287        }
3288        return journal;
3289    }
3290
3291    protected Metadata getMetadata() {
3292        return metadata;
3293    }
3294
3295    public boolean isFailIfDatabaseIsLocked() {
3296        return failIfDatabaseIsLocked;
3297    }
3298
3299    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
3300        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
3301    }
3302
3303    public boolean isIgnoreMissingJournalfiles() {
3304        return ignoreMissingJournalfiles;
3305    }
3306
3307    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
3308        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
3309    }
3310
3311    public int getIndexCacheSize() {
3312        return indexCacheSize;
3313    }
3314
3315    public void setIndexCacheSize(int indexCacheSize) {
3316        this.indexCacheSize = indexCacheSize;
3317    }
3318
3319    public boolean isCheckForCorruptJournalFiles() {
3320        return checkForCorruptJournalFiles;
3321    }
3322
3323    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
3324        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
3325    }
3326
3327    public boolean isChecksumJournalFiles() {
3328        return checksumJournalFiles;
3329    }
3330
3331    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
3332        this.checksumJournalFiles = checksumJournalFiles;
3333    }
3334
3335    @Override
3336    public void setBrokerService(BrokerService brokerService) {
3337        this.brokerService = brokerService;
3338    }
3339
3340    /**
3341     * @return the archiveDataLogs
3342     */
3343    public boolean isArchiveDataLogs() {
3344        return this.archiveDataLogs;
3345    }
3346
3347    /**
3348     * @param archiveDataLogs the archiveDataLogs to set
3349     */
3350    public void setArchiveDataLogs(boolean archiveDataLogs) {
3351        this.archiveDataLogs = archiveDataLogs;
3352    }
3353
3354    /**
3355     * @return the directoryArchive
3356     */
3357    public File getDirectoryArchive() {
3358        return this.directoryArchive;
3359    }
3360
3361    /**
3362     * @param directoryArchive the directoryArchive to set
3363     */
3364    public void setDirectoryArchive(File directoryArchive) {
3365        this.directoryArchive = directoryArchive;
3366    }
3367
3368    public boolean isArchiveCorruptedIndex() {
3369        return archiveCorruptedIndex;
3370    }
3371
3372    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
3373        this.archiveCorruptedIndex = archiveCorruptedIndex;
3374    }
3375
3376    public float getIndexLFUEvictionFactor() {
3377        return indexLFUEvictionFactor;
3378    }
3379
3380    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
3381        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
3382    }
3383
3384    public boolean isUseIndexLFRUEviction() {
3385        return useIndexLFRUEviction;
3386    }
3387
3388    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
3389        this.useIndexLFRUEviction = useIndexLFRUEviction;
3390    }
3391
3392    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
3393        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
3394    }
3395
3396    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
3397        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
3398    }
3399
3400    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
3401        this.enableIndexPageCaching = enableIndexPageCaching;
3402    }
3403
3404    public boolean isEnableIndexDiskSyncs() {
3405        return enableIndexDiskSyncs;
3406    }
3407
3408    public boolean isEnableIndexRecoveryFile() {
3409        return enableIndexRecoveryFile;
3410    }
3411
3412    public boolean isEnableIndexPageCaching() {
3413        return enableIndexPageCaching;
3414    }
3415
3416    // /////////////////////////////////////////////////////////////////
3417    // Internal conversion methods.
3418    // /////////////////////////////////////////////////////////////////
3419
3420    class MessageOrderCursor{
3421        long defaultCursorPosition;
3422        long lowPriorityCursorPosition;
3423        long highPriorityCursorPosition;
3424        MessageOrderCursor(){
3425        }
3426
3427        MessageOrderCursor(long position){
3428            this.defaultCursorPosition=position;
3429            this.lowPriorityCursorPosition=position;
3430            this.highPriorityCursorPosition=position;
3431        }
3432
3433        MessageOrderCursor(MessageOrderCursor other){
3434            this.defaultCursorPosition=other.defaultCursorPosition;
3435            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3436            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3437        }
3438
3439        MessageOrderCursor copy() {
3440            return new MessageOrderCursor(this);
3441        }
3442
3443        void reset() {
3444            this.defaultCursorPosition=0;
3445            this.highPriorityCursorPosition=0;
3446            this.lowPriorityCursorPosition=0;
3447        }
3448
3449        void increment() {
3450            if (defaultCursorPosition!=0) {
3451                defaultCursorPosition++;
3452            }
3453            if (highPriorityCursorPosition!=0) {
3454                highPriorityCursorPosition++;
3455            }
3456            if (lowPriorityCursorPosition!=0) {
3457                lowPriorityCursorPosition++;
3458            }
3459        }
3460
3461        @Override
3462        public String toString() {
3463           return "MessageOrderCursor:[def:" + defaultCursorPosition
3464                   + ", low:" + lowPriorityCursorPosition
3465                   + ", high:" +  highPriorityCursorPosition + "]";
3466        }
3467
3468        public void sync(MessageOrderCursor other) {
3469            this.defaultCursorPosition=other.defaultCursorPosition;
3470            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3471            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3472        }
3473    }
3474
3475    class MessageOrderIndex {
3476        static final byte HI = 9;
3477        static final byte LO = 0;
3478        static final byte DEF = 4;
3479
3480        long nextMessageId;
3481        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3482        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3483        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3484        final MessageOrderCursor cursor = new MessageOrderCursor();
3485        Long lastDefaultKey;
3486        Long lastHighKey;
3487        Long lastLowKey;
3488        byte lastGetPriority;
3489        final List<Long> pendingAdditions = new LinkedList<Long>();
3490        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
3491
3492        MessageKeys remove(Transaction tx, Long key) throws IOException {
3493            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3494            if (result == null && highPriorityIndex!=null) {
3495                result = highPriorityIndex.remove(tx, key);
3496                if (result ==null && lowPriorityIndex!=null) {
3497                    result = lowPriorityIndex.remove(tx, key);
3498                }
3499            }
3500            return result;
3501        }
3502
3503        void load(Transaction tx) throws IOException {
3504            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3505            defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3506            defaultPriorityIndex.load(tx);
3507            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3508            lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3509            lowPriorityIndex.load(tx);
3510            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3511            highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3512            highPriorityIndex.load(tx);
3513        }
3514
3515        void allocate(Transaction tx) throws IOException {
3516            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3517            if (metadata.version >= 2) {
3518                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3519                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3520            }
3521        }
3522
3523        void configureLast(Transaction tx) throws IOException {
3524            // Figure out the next key using the last entry in the destination.
3525            TreeSet<Long> orderedSet = new TreeSet<Long>();
3526
3527            addLast(orderedSet, highPriorityIndex, tx);
3528            addLast(orderedSet, defaultPriorityIndex, tx);
3529            addLast(orderedSet, lowPriorityIndex, tx);
3530
3531            if (!orderedSet.isEmpty()) {
3532                nextMessageId = orderedSet.last() + 1;
3533            }
3534        }
3535
3536        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3537            if (index != null) {
3538                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3539                if (lastEntry != null) {
3540                    orderedSet.add(lastEntry.getKey());
3541                }
3542            }
3543        }
3544
3545        void clear(Transaction tx) throws IOException {
3546            this.remove(tx);
3547            this.resetCursorPosition();
3548            this.allocate(tx);
3549            this.load(tx);
3550            this.configureLast(tx);
3551        }
3552
3553        void remove(Transaction tx) throws IOException {
3554            defaultPriorityIndex.clear(tx);
3555            defaultPriorityIndex.unload(tx);
3556            tx.free(defaultPriorityIndex.getPageId());
3557            if (lowPriorityIndex != null) {
3558                lowPriorityIndex.clear(tx);
3559                lowPriorityIndex.unload(tx);
3560
3561                tx.free(lowPriorityIndex.getPageId());
3562            }
3563            if (highPriorityIndex != null) {
3564                highPriorityIndex.clear(tx);
3565                highPriorityIndex.unload(tx);
3566                tx.free(highPriorityIndex.getPageId());
3567            }
3568        }
3569
3570        void resetCursorPosition() {
3571            this.cursor.reset();
3572            lastDefaultKey = null;
3573            lastHighKey = null;
3574            lastLowKey = null;
3575        }
3576
3577        void setBatch(Transaction tx, Long sequence) throws IOException {
3578            if (sequence != null) {
3579                Long nextPosition = new Long(sequence.longValue() + 1);
3580                lastDefaultKey = sequence;
3581                cursor.defaultCursorPosition = nextPosition.longValue();
3582                lastHighKey = sequence;
3583                cursor.highPriorityCursorPosition = nextPosition.longValue();
3584                lastLowKey = sequence;
3585                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3586            }
3587        }
3588
3589        void setBatch(Transaction tx, LastAck last) throws IOException {
3590            setBatch(tx, last.lastAckedSequence);
3591            if (cursor.defaultCursorPosition == 0
3592                    && cursor.highPriorityCursorPosition == 0
3593                    && cursor.lowPriorityCursorPosition == 0) {
3594                long next = last.lastAckedSequence + 1;
3595                switch (last.priority) {
3596                    case DEF:
3597                        cursor.defaultCursorPosition = next;
3598                        cursor.highPriorityCursorPosition = next;
3599                        break;
3600                    case HI:
3601                        cursor.highPriorityCursorPosition = next;
3602                        break;
3603                    case LO:
3604                        cursor.lowPriorityCursorPosition = next;
3605                        cursor.defaultCursorPosition = next;
3606                        cursor.highPriorityCursorPosition = next;
3607                        break;
3608                }
3609            }
3610        }
3611
3612        void stoppedIterating() {
3613            if (lastDefaultKey!=null) {
3614                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3615            }
3616            if (lastHighKey!=null) {
3617                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3618            }
3619            if (lastLowKey!=null) {
3620                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3621            }
3622            lastDefaultKey = null;
3623            lastHighKey = null;
3624            lastLowKey = null;
3625        }
3626
3627        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3628                throws IOException {
3629            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3630                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3631            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3632                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3633            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3634                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3635            }
3636        }
3637
3638        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3639                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3640
3641            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3642            deletes.add(iterator.next());
3643        }
3644
3645        long getNextMessageId() {
3646            return nextMessageId++;
3647        }
3648
3649        void revertNextMessageId() {
3650            nextMessageId--;
3651        }
3652
3653        MessageKeys get(Transaction tx, Long key) throws IOException {
3654            MessageKeys result = defaultPriorityIndex.get(tx, key);
3655            if (result == null) {
3656                result = highPriorityIndex.get(tx, key);
3657                if (result == null) {
3658                    result = lowPriorityIndex.get(tx, key);
3659                    lastGetPriority = LO;
3660                } else {
3661                    lastGetPriority = HI;
3662                }
3663            } else {
3664                lastGetPriority = DEF;
3665            }
3666            return result;
3667        }
3668
3669        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3670            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3671                return defaultPriorityIndex.put(tx, key, value);
3672            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3673                return highPriorityIndex.put(tx, key, value);
3674            } else {
3675                return lowPriorityIndex.put(tx, key, value);
3676            }
3677        }
3678
3679        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3680            return new MessageOrderIterator(tx,cursor,this);
3681        }
3682
3683        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3684            return new MessageOrderIterator(tx,m,this);
3685        }
3686
3687        public byte lastGetPriority() {
3688            return lastGetPriority;
3689        }
3690
3691        public boolean alreadyDispatched(Long sequence) {
3692            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3693                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3694                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3695        }
3696
3697        public void trackPendingAdd(Long seq) {
3698            synchronized (pendingAdditions) {
3699                pendingAdditions.add(seq);
3700            }
3701        }
3702
3703        public void trackPendingAddComplete(Long seq) {
3704            synchronized (pendingAdditions) {
3705                pendingAdditions.remove(seq);
3706            }
3707        }
3708
3709        public Long minPendingAdd() {
3710            synchronized (pendingAdditions) {
3711                if (!pendingAdditions.isEmpty()) {
3712                    return pendingAdditions.get(0);
3713                } else {
3714                    return null;
3715                }
3716            }
3717        }
3718
3719        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3720            Iterator<Entry<Long, MessageKeys>>currentIterator;
3721            final Iterator<Entry<Long, MessageKeys>>highIterator;
3722            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3723            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3724
3725            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3726                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3727                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3728                if (highPriorityIndex != null) {
3729                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3730                } else {
3731                    this.highIterator = null;
3732                }
3733                if (lowPriorityIndex != null) {
3734                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3735                } else {
3736                    this.lowIterator = null;
3737                }
3738            }
3739
3740            @Override
3741            public boolean hasNext() {
3742                if (currentIterator == null) {
3743                    if (highIterator != null) {
3744                        if (highIterator.hasNext()) {
3745                            currentIterator = highIterator;
3746                            return currentIterator.hasNext();
3747                        }
3748                        if (defaultIterator.hasNext()) {
3749                            currentIterator = defaultIterator;
3750                            return currentIterator.hasNext();
3751                        }
3752                        if (lowIterator.hasNext()) {
3753                            currentIterator = lowIterator;
3754                            return currentIterator.hasNext();
3755                        }
3756                        return false;
3757                    } else {
3758                        currentIterator = defaultIterator;
3759                        return currentIterator.hasNext();
3760                    }
3761                }
3762                if (highIterator != null) {
3763                    if (currentIterator.hasNext()) {
3764                        return true;
3765                    }
3766                    if (currentIterator == highIterator) {
3767                        if (defaultIterator.hasNext()) {
3768                            currentIterator = defaultIterator;
3769                            return currentIterator.hasNext();
3770                        }
3771                        if (lowIterator.hasNext()) {
3772                            currentIterator = lowIterator;
3773                            return currentIterator.hasNext();
3774                        }
3775                        return false;
3776                    }
3777
3778                    if (currentIterator == defaultIterator) {
3779                        if (lowIterator.hasNext()) {
3780                            currentIterator = lowIterator;
3781                            return currentIterator.hasNext();
3782                        }
3783                        return false;
3784                    }
3785                }
3786                return currentIterator.hasNext();
3787            }
3788
3789            @Override
3790            public Entry<Long, MessageKeys> next() {
3791                Entry<Long, MessageKeys> result = currentIterator.next();
3792                if (result != null) {
3793                    Long key = result.getKey();
3794                    if (highIterator != null) {
3795                        if (currentIterator == defaultIterator) {
3796                            lastDefaultKey = key;
3797                        } else if (currentIterator == highIterator) {
3798                            lastHighKey = key;
3799                        } else {
3800                            lastLowKey = key;
3801                        }
3802                    } else {
3803                        lastDefaultKey = key;
3804                    }
3805                }
3806                return result;
3807            }
3808
3809            @Override
3810            public void remove() {
3811                throw new UnsupportedOperationException();
3812            }
3813        }
3814    }
3815
3816    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3817        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3818
3819        @Override
3820        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3821            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3822            ObjectOutputStream oout = new ObjectOutputStream(baos);
3823            oout.writeObject(object);
3824            oout.flush();
3825            oout.close();
3826            byte[] data = baos.toByteArray();
3827            dataOut.writeInt(data.length);
3828            dataOut.write(data);
3829        }
3830
3831        @Override
3832        @SuppressWarnings("unchecked")
3833        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3834            int dataLen = dataIn.readInt();
3835            byte[] data = new byte[dataLen];
3836            dataIn.readFully(data);
3837            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3838            ObjectInputStream oin = new ObjectInputStream(bais);
3839            try {
3840                return (HashSet<String>) oin.readObject();
3841            } catch (ClassNotFoundException cfe) {
3842                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3843                ioe.initCause(cfe);
3844                throw ioe;
3845            }
3846        }
3847    }
3848
3849    public File getIndexDirectory() {
3850        return indexDirectory;
3851    }
3852
3853    public void setIndexDirectory(File indexDirectory) {
3854        this.indexDirectory = indexDirectory;
3855    }
3856
3857    interface IndexAware {
3858        public void sequenceAssignedWithIndexLocked(long index);
3859    }
3860
3861    public String getPreallocationScope() {
3862        return preallocationScope;
3863    }
3864
3865    public void setPreallocationScope(String preallocationScope) {
3866        this.preallocationScope = preallocationScope;
3867    }
3868
3869    public String getPreallocationStrategy() {
3870        return preallocationStrategy;
3871    }
3872
3873    public void setPreallocationStrategy(String preallocationStrategy) {
3874        this.preallocationStrategy = preallocationStrategy;
3875    }
3876
3877    public int getCompactAcksAfterNoGC() {
3878        return compactAcksAfterNoGC;
3879    }
3880
3881    /**
3882     * Sets the number of GC cycles where no journal logs were removed before an attempt to
3883     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
3884     * <p>
3885     * A value of -1 will disable this feature.
3886     *
3887     * @param compactAcksAfterNoGC
3888     *      Number of empty GC cycles before we rewrite old ACKS.
3889     */
3890    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
3891        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
3892    }
3893
3894    /**
3895     * Returns whether Ack compaction will ignore that the store is still growing
3896     * and run more often.
3897     *
3898     * @return the compactAcksIgnoresStoreGrowth current value.
3899     */
3900    public boolean isCompactAcksIgnoresStoreGrowth() {
3901        return compactAcksIgnoresStoreGrowth;
3902    }
3903
3904    /**
3905     * Configure if Ack compaction will occur regardless of continued growth of the
3906     * journal logs meaning that the store has not run out of space yet.  Because the
3907     * compaction operation can be costly this value is defaulted to off and the Ack
3908     * compaction is only done when it seems that the store cannot grow and larger.
3909     *
3910     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
3911     */
3912    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
3913        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
3914    }
3915
3916    /**
3917     * Returns whether Ack compaction is enabled
3918     *
3919     * @return enableAckCompaction
3920     */
3921    public boolean isEnableAckCompaction() {
3922        return enableAckCompaction;
3923    }
3924
3925    /**
3926     * Configure if the Ack compaction task should be enabled to run
3927     *
3928     * @param enableAckCompaction
3929     */
3930    public void setEnableAckCompaction(boolean enableAckCompaction) {
3931        this.enableAckCompaction = enableAckCompaction;
3932    }
3933
3934    /**
3935     * @return
3936     */
3937    public boolean isEnableSubscriptionStatistics() {
3938        return enableSubscriptionStatistics;
3939    }
3940
3941    /**
3942     * Enable caching statistics for each subscription to allow non-blocking
3943     * retrieval of metrics.  This could incur some overhead to compute if there are a lot
3944     * of subscriptions.
3945     *
3946     * @param enableSubscriptionStatistics
3947     */
3948    public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) {
3949        this.enableSubscriptionStatistics = enableSubscriptionStatistics;
3950    }
3951}