001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.EOFException;
024import java.io.File;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.InterruptedIOException;
028import java.io.ObjectInputStream;
029import java.io.ObjectOutputStream;
030import java.io.OutputStream;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.Date;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.LinkedHashMap;
040import java.util.LinkedHashSet;
041import java.util.LinkedList;
042import java.util.List;
043import java.util.Map;
044import java.util.Map.Entry;
045import java.util.Set;
046import java.util.SortedSet;
047import java.util.TreeMap;
048import java.util.TreeSet;
049import java.util.concurrent.ConcurrentHashMap;
050import java.util.concurrent.ConcurrentMap;
051import java.util.concurrent.atomic.AtomicBoolean;
052import java.util.concurrent.atomic.AtomicLong;
053import java.util.concurrent.locks.ReentrantReadWriteLock;
054
055import org.apache.activemq.ActiveMQMessageAuditNoSync;
056import org.apache.activemq.broker.BrokerService;
057import org.apache.activemq.broker.BrokerServiceAware;
058import org.apache.activemq.broker.region.Destination;
059import org.apache.activemq.broker.region.Queue;
060import org.apache.activemq.broker.region.Topic;
061import org.apache.activemq.command.MessageAck;
062import org.apache.activemq.command.TransactionId;
063import org.apache.activemq.openwire.OpenWireFormat;
064import org.apache.activemq.protobuf.Buffer;
065import org.apache.activemq.store.MessageStore;
066import org.apache.activemq.store.MessageStoreStatistics;
067import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
068import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
069import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
070import org.apache.activemq.store.kahadb.data.KahaDestination;
071import org.apache.activemq.store.kahadb.data.KahaEntryType;
072import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
073import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
074import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
075import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
076import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
077import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
078import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
079import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
080import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
081import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
082import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
083import org.apache.activemq.store.kahadb.disk.index.ListIndex;
084import org.apache.activemq.store.kahadb.disk.journal.DataFile;
085import org.apache.activemq.store.kahadb.disk.journal.Journal;
086import org.apache.activemq.store.kahadb.disk.journal.Location;
087import org.apache.activemq.store.kahadb.disk.page.Page;
088import org.apache.activemq.store.kahadb.disk.page.PageFile;
089import org.apache.activemq.store.kahadb.disk.page.Transaction;
090import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
091import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
092import org.apache.activemq.store.kahadb.disk.util.Marshaller;
093import org.apache.activemq.store.kahadb.disk.util.Sequence;
094import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
095import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
096import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
097import org.apache.activemq.util.ByteSequence;
098import org.apache.activemq.util.DataByteArrayInputStream;
099import org.apache.activemq.util.DataByteArrayOutputStream;
100import org.apache.activemq.util.IOHelper;
101import org.apache.activemq.util.ServiceStopper;
102import org.apache.activemq.util.ServiceSupport;
103import org.slf4j.Logger;
104import org.slf4j.LoggerFactory;
105
106public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
107
108    protected BrokerService brokerService;
109
110    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
111    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
112    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
113    protected static final Buffer UNMATCHED;
114    static {
115        UNMATCHED = new Buffer(new byte[]{});
116    }
117    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
118
119    static final int CLOSED_STATE = 1;
120    static final int OPEN_STATE = 2;
121    static final long NOT_ACKED = -1;
122
123    static final int VERSION = 6;
124
125    protected class Metadata {
126        protected Page<Metadata> page;
127        protected int state;
128        protected BTreeIndex<String, StoredDestination> destinations;
129        protected Location lastUpdate;
130        protected Location firstInProgressTransactionLocation;
131        protected Location producerSequenceIdTrackerLocation = null;
132        protected Location ackMessageFileMapLocation = null;
133        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
134        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
135        protected int version = VERSION;
136        protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
137
138        public void read(DataInput is) throws IOException {
139            state = is.readInt();
140            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
141            if (is.readBoolean()) {
142                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
143            } else {
144                lastUpdate = null;
145            }
146            if (is.readBoolean()) {
147                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
148            } else {
149                firstInProgressTransactionLocation = null;
150            }
151            try {
152                if (is.readBoolean()) {
153                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
154                } else {
155                    producerSequenceIdTrackerLocation = null;
156                }
157            } catch (EOFException expectedOnUpgrade) {
158            }
159            try {
160                version = is.readInt();
161            } catch (EOFException expectedOnUpgrade) {
162                version = 1;
163            }
164            if (version >= 5 && is.readBoolean()) {
165                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
166            } else {
167                ackMessageFileMapLocation = null;
168            }
169            try {
170                openwireVersion = is.readInt();
171            } catch (EOFException expectedOnUpgrade) {
172                openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
173            }
174            LOG.info("KahaDB is version " + version);
175        }
176
177        public void write(DataOutput os) throws IOException {
178            os.writeInt(state);
179            os.writeLong(destinations.getPageId());
180
181            if (lastUpdate != null) {
182                os.writeBoolean(true);
183                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
184            } else {
185                os.writeBoolean(false);
186            }
187
188            if (firstInProgressTransactionLocation != null) {
189                os.writeBoolean(true);
190                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
191            } else {
192                os.writeBoolean(false);
193            }
194
195            if (producerSequenceIdTrackerLocation != null) {
196                os.writeBoolean(true);
197                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
198            } else {
199                os.writeBoolean(false);
200            }
201            os.writeInt(VERSION);
202            if (ackMessageFileMapLocation != null) {
203                os.writeBoolean(true);
204                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
205            } else {
206                os.writeBoolean(false);
207            }
208            os.writeInt(this.openwireVersion);
209        }
210    }
211
212    class MetadataMarshaller extends VariableMarshaller<Metadata> {
213        @Override
214        public Metadata readPayload(DataInput dataIn) throws IOException {
215            Metadata rc = createMetadata();
216            rc.read(dataIn);
217            return rc;
218        }
219
220        @Override
221        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
222            object.write(dataOut);
223        }
224    }
225
226    protected PageFile pageFile;
227    protected Journal journal;
228    protected Metadata metadata = new Metadata();
229
230    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
231
232    protected boolean failIfDatabaseIsLocked;
233
234    protected boolean deleteAllMessages;
235    protected File directory = DEFAULT_DIRECTORY;
236    protected File indexDirectory = null;
237    protected Thread checkpointThread;
238    protected boolean enableJournalDiskSyncs=true;
239    protected boolean archiveDataLogs;
240    protected File directoryArchive;
241    protected AtomicLong journalSize = new AtomicLong(0);
242    long checkpointInterval = 5*1000;
243    long cleanupInterval = 30*1000;
244    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
245    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
246    boolean enableIndexWriteAsync = false;
247    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
248    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
249    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
250
251    protected AtomicBoolean opened = new AtomicBoolean();
252    private boolean ignoreMissingJournalfiles = false;
253    private int indexCacheSize = 10000;
254    private boolean checkForCorruptJournalFiles = false;
255    private boolean checksumJournalFiles = true;
256    protected boolean forceRecoverIndex = false;
257    private final Object checkpointThreadLock = new Object();
258    private boolean archiveCorruptedIndex = false;
259    private boolean useIndexLFRUEviction = false;
260    private float indexLFUEvictionFactor = 0.2f;
261    private boolean enableIndexDiskSyncs = true;
262    private boolean enableIndexRecoveryFile = true;
263    private boolean enableIndexPageCaching = true;
264    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
265
266    @Override
267    public void doStart() throws Exception {
268        load();
269    }
270
271    @Override
272    public void doStop(ServiceStopper stopper) throws Exception {
273        unload();
274    }
275
276    private void loadPageFile() throws IOException {
277        this.indexLock.writeLock().lock();
278        try {
279            final PageFile pageFile = getPageFile();
280            pageFile.load();
281            pageFile.tx().execute(new Transaction.Closure<IOException>() {
282                @Override
283                public void execute(Transaction tx) throws IOException {
284                    if (pageFile.getPageCount() == 0) {
285                        // First time this is created.. Initialize the metadata
286                        Page<Metadata> page = tx.allocate();
287                        assert page.getPageId() == 0;
288                        page.set(metadata);
289                        metadata.page = page;
290                        metadata.state = CLOSED_STATE;
291                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
292
293                        tx.store(metadata.page, metadataMarshaller, true);
294                    } else {
295                        Page<Metadata> page = tx.load(0, metadataMarshaller);
296                        metadata = page.get();
297                        metadata.page = page;
298                    }
299                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
300                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
301                    metadata.destinations.load(tx);
302                }
303            });
304            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
305            // Perhaps we should just keep an index of file
306            storedDestinations.clear();
307            pageFile.tx().execute(new Transaction.Closure<IOException>() {
308                @Override
309                public void execute(Transaction tx) throws IOException {
310                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
311                        Entry<String, StoredDestination> entry = iterator.next();
312                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
313                        storedDestinations.put(entry.getKey(), sd);
314
315                        if (checkForCorruptJournalFiles) {
316                            // sanity check the index also
317                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
318                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
319                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
320                                }
321                            }
322                        }
323                    }
324                }
325            });
326            pageFile.flush();
327        } finally {
328            this.indexLock.writeLock().unlock();
329        }
330    }
331
332    private void startCheckpoint() {
333        if (checkpointInterval == 0 &&  cleanupInterval == 0) {
334            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
335            return;
336        }
337        synchronized (checkpointThreadLock) {
338            boolean start = false;
339            if (checkpointThread == null) {
340                start = true;
341            } else if (!checkpointThread.isAlive()) {
342                start = true;
343                LOG.info("KahaDB: Recovering checkpoint thread after death");
344            }
345            if (start) {
346                checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
347                    @Override
348                    public void run() {
349                        try {
350                            long lastCleanup = System.currentTimeMillis();
351                            long lastCheckpoint = System.currentTimeMillis();
352                            // Sleep for a short time so we can periodically check
353                            // to see if we need to exit this thread.
354                            long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
355                            while (opened.get()) {
356                                Thread.sleep(sleepTime);
357                                long now = System.currentTimeMillis();
358                                if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
359                                    checkpointCleanup(true);
360                                    lastCleanup = now;
361                                    lastCheckpoint = now;
362                                } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
363                                    checkpointCleanup(false);
364                                    lastCheckpoint = now;
365                                }
366                            }
367                        } catch (InterruptedException e) {
368                            // Looks like someone really wants us to exit this thread...
369                        } catch (IOException ioe) {
370                            LOG.error("Checkpoint failed", ioe);
371                            brokerService.handleIOException(ioe);
372                        }
373                    }
374                };
375
376                checkpointThread.setDaemon(true);
377                checkpointThread.start();
378            }
379        }
380    }
381
382    public void open() throws IOException {
383        if( opened.compareAndSet(false, true) ) {
384            getJournal().start();
385            try {
386                loadPageFile();
387            } catch (Throwable t) {
388                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
389                if (LOG.isDebugEnabled()) {
390                    LOG.debug("Index load failure", t);
391                }
392                // try to recover index
393                try {
394                    pageFile.unload();
395                } catch (Exception ignore) {}
396                if (archiveCorruptedIndex) {
397                    pageFile.archive();
398                } else {
399                    pageFile.delete();
400                }
401                metadata = createMetadata();
402                //The metadata was recreated after a detect corruption so we need to
403                //reconfigure anything that was configured on the old metadata on startup
404                configureMetadata();
405                pageFile = null;
406                loadPageFile();
407            }
408            startCheckpoint();
409            recover();
410        }
411    }
412
413    public void load() throws IOException {
414        this.indexLock.writeLock().lock();
415        IOHelper.mkdirs(directory);
416        try {
417            if (deleteAllMessages) {
418                getJournal().start();
419                getJournal().delete();
420                getJournal().close();
421                journal = null;
422                getPageFile().delete();
423                LOG.info("Persistence store purged.");
424                deleteAllMessages = false;
425            }
426
427            open();
428            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
429        } finally {
430            this.indexLock.writeLock().unlock();
431        }
432    }
433
434    public void close() throws IOException, InterruptedException {
435        if( opened.compareAndSet(true, false)) {
436            checkpointLock.writeLock().lock();
437            try {
438                if (metadata.page != null) {
439                    checkpointUpdate(true);
440                }
441                pageFile.unload();
442                metadata = createMetadata();
443            } finally {
444                checkpointLock.writeLock().unlock();
445            }
446            journal.close();
447            synchronized (checkpointThreadLock) {
448                if (checkpointThread != null) {
449                    checkpointThread.join();
450                }
451            }
452            //clear the cache and journalSize on shutdown of the store
453            storeCache.clear();
454            journalSize.set(0);
455        }
456    }
457
458    public void unload() throws IOException, InterruptedException {
459        this.indexLock.writeLock().lock();
460        try {
461            if( pageFile != null && pageFile.isLoaded() ) {
462                metadata.state = CLOSED_STATE;
463                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
464
465                if (metadata.page != null) {
466                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
467                        @Override
468                        public void execute(Transaction tx) throws IOException {
469                            tx.store(metadata.page, metadataMarshaller, true);
470                        }
471                    });
472                }
473            }
474        } finally {
475            this.indexLock.writeLock().unlock();
476        }
477        close();
478    }
479
480    // public for testing
481    @SuppressWarnings("rawtypes")
482    public Location[] getInProgressTxLocationRange() {
483        Location[] range = new Location[]{null, null};
484        synchronized (inflightTransactions) {
485            if (!inflightTransactions.isEmpty()) {
486                for (List<Operation> ops : inflightTransactions.values()) {
487                    if (!ops.isEmpty()) {
488                        trackMaxAndMin(range, ops);
489                    }
490                }
491            }
492            if (!preparedTransactions.isEmpty()) {
493                for (List<Operation> ops : preparedTransactions.values()) {
494                    if (!ops.isEmpty()) {
495                        trackMaxAndMin(range, ops);
496                    }
497                }
498            }
499        }
500        return range;
501    }
502
503    @SuppressWarnings("rawtypes")
504    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
505        Location t = ops.get(0).getLocation();
506        if (range[0]==null || t.compareTo(range[0]) <= 0) {
507            range[0] = t;
508        }
509        t = ops.get(ops.size() -1).getLocation();
510        if (range[1]==null || t.compareTo(range[1]) >= 0) {
511            range[1] = t;
512        }
513    }
514
515    class TranInfo {
516        TransactionId id;
517        Location location;
518
519        class opCount {
520            int add;
521            int remove;
522        }
523        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
524
525        @SuppressWarnings("rawtypes")
526        public void track(Operation operation) {
527            if (location == null ) {
528                location = operation.getLocation();
529            }
530            KahaDestination destination;
531            boolean isAdd = false;
532            if (operation instanceof AddOperation) {
533                AddOperation add = (AddOperation) operation;
534                destination = add.getCommand().getDestination();
535                isAdd = true;
536            } else {
537                RemoveOperation removeOpperation = (RemoveOperation) operation;
538                destination = removeOpperation.getCommand().getDestination();
539            }
540            opCount opCount = destinationOpCount.get(destination);
541            if (opCount == null) {
542                opCount = new opCount();
543                destinationOpCount.put(destination, opCount);
544            }
545            if (isAdd) {
546                opCount.add++;
547            } else {
548                opCount.remove++;
549            }
550        }
551
552        @Override
553        public String toString() {
554           StringBuffer buffer = new StringBuffer();
555           buffer.append(location).append(";").append(id).append(";\n");
556           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
557               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
558           }
559           return buffer.toString();
560        }
561    }
562
563    @SuppressWarnings("rawtypes")
564    public String getTransactions() {
565
566        ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
567        synchronized (inflightTransactions) {
568            if (!inflightTransactions.isEmpty()) {
569                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
570                    TranInfo info = new TranInfo();
571                    info.id = entry.getKey();
572                    for (Operation operation : entry.getValue()) {
573                        info.track(operation);
574                    }
575                    infos.add(info);
576                }
577            }
578        }
579        synchronized (preparedTransactions) {
580            if (!preparedTransactions.isEmpty()) {
581                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
582                    TranInfo info = new TranInfo();
583                    info.id = entry.getKey();
584                    for (Operation operation : entry.getValue()) {
585                        info.track(operation);
586                    }
587                    infos.add(info);
588                }
589            }
590        }
591        return infos.toString();
592    }
593
594    /**
595     * Move all the messages that were in the journal into long term storage. We
596     * just replay and do a checkpoint.
597     *
598     * @throws IOException
599     * @throws IOException
600     * @throws IllegalStateException
601     */
602    private void recover() throws IllegalStateException, IOException {
603        this.indexLock.writeLock().lock();
604        try {
605
606            long start = System.currentTimeMillis();
607            Location producerAuditPosition = recoverProducerAudit();
608            Location ackMessageFileLocation = recoverAckMessageFileMap();
609            Location lastIndoubtPosition = getRecoveryPosition();
610
611            Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
612            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
613
614            if (recoveryPosition != null) {
615                int redoCounter = 0;
616                LOG.info("Recovering from the journal @" + recoveryPosition);
617                while (recoveryPosition != null) {
618                    try {
619                        JournalCommand<?> message = load(recoveryPosition);
620                        metadata.lastUpdate = recoveryPosition;
621                        process(message, recoveryPosition, lastIndoubtPosition);
622                        redoCounter++;
623                    } catch (IOException failedRecovery) {
624                        if (isIgnoreMissingJournalfiles()) {
625                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
626                            // track this dud location
627                            journal.corruptRecoveryLocation(recoveryPosition);
628                        } else {
629                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
630                        }
631                    }
632                    recoveryPosition = journal.getNextLocation(recoveryPosition);
633                     if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
634                         LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries recovered ..");
635                     }
636                }
637                if (LOG.isInfoEnabled()) {
638                    long end = System.currentTimeMillis();
639                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
640                }
641            }
642
643            // We may have to undo some index updates.
644            pageFile.tx().execute(new Transaction.Closure<IOException>() {
645                @Override
646                public void execute(Transaction tx) throws IOException {
647                    recoverIndex(tx);
648                }
649            });
650
651            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
652            Set<TransactionId> toRollback = new HashSet<TransactionId>();
653            Set<TransactionId> toDiscard = new HashSet<TransactionId>();
654            synchronized (inflightTransactions) {
655                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
656                    TransactionId id = it.next();
657                    if (id.isLocalTransaction()) {
658                        toRollback.add(id);
659                    } else {
660                        toDiscard.add(id);
661                    }
662                }
663                for (TransactionId tx: toRollback) {
664                    if (LOG.isDebugEnabled()) {
665                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
666                    }
667                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
668                }
669                for (TransactionId tx: toDiscard) {
670                    if (LOG.isDebugEnabled()) {
671                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
672                    }
673                    inflightTransactions.remove(tx);
674                }
675            }
676
677            synchronized (preparedTransactions) {
678                for (TransactionId txId : preparedTransactions.keySet()) {
679                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
680                }
681            }
682
683        } finally {
684            this.indexLock.writeLock().unlock();
685        }
686    }
687
688    @SuppressWarnings("unused")
689    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
690        return TransactionIdConversion.convertToLocal(tx);
691    }
692
693    private Location minimum(Location producerAuditPosition,
694            Location lastIndoubtPosition) {
695        Location min = null;
696        if (producerAuditPosition != null) {
697            min = producerAuditPosition;
698            if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
699                min = lastIndoubtPosition;
700            }
701        } else {
702            min = lastIndoubtPosition;
703        }
704        return min;
705    }
706
707    private Location recoverProducerAudit() throws IOException {
708        if (metadata.producerSequenceIdTrackerLocation != null) {
709            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
710            try {
711                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
712                int maxNumProducers = getMaxFailoverProducersToTrack();
713                int maxAuditDepth = getFailoverProducersAuditDepth();
714                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
715                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
716                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
717                return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
718            } catch (Exception e) {
719                LOG.warn("Cannot recover message audit", e);
720                return journal.getNextLocation(null);
721            }
722        } else {
723            // got no audit stored so got to recreate via replay from start of the journal
724            return journal.getNextLocation(null);
725        }
726    }
727
728    @SuppressWarnings("unchecked")
729    private Location recoverAckMessageFileMap() throws IOException {
730        if (metadata.ackMessageFileMapLocation != null) {
731            KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
732            try {
733                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
734                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
735                return journal.getNextLocation(metadata.ackMessageFileMapLocation);
736            } catch (Exception e) {
737                LOG.warn("Cannot recover ackMessageFileMap", e);
738                return journal.getNextLocation(null);
739            }
740        } else {
741            // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
742            return journal.getNextLocation(null);
743        }
744    }
745
746    protected void recoverIndex(Transaction tx) throws IOException {
747        long start = System.currentTimeMillis();
748        // It is possible index updates got applied before the journal updates..
749        // in that case we need to removed references to messages that are not in the journal
750        final Location lastAppendLocation = journal.getLastAppendLocation();
751        long undoCounter=0;
752
753        // Go through all the destinations to see if they have messages past the lastAppendLocation
754        for (String key : storedDestinations.keySet()) {
755            StoredDestination sd = storedDestinations.get(key);
756
757            final ArrayList<Long> matches = new ArrayList<Long>();
758            // Find all the Locations that are >= than the last Append Location.
759            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
760                @Override
761                protected void matched(Location key, Long value) {
762                    matches.add(value);
763                }
764            });
765
766            for (Long sequenceId : matches) {
767                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
768                if (keys != null) {
769                    sd.locationIndex.remove(tx, keys.location);
770                    sd.messageIdIndex.remove(tx, keys.messageId);
771                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
772                    undoCounter++;
773                    decrementAndSubSizeToStoreStat(key, keys.location.getSize());
774                    // TODO: do we need to modify the ack positions for the pub sub case?
775                }
776            }
777        }
778
779        if( undoCounter > 0 ) {
780            // The rolledback operations are basically in flight journal writes.  To avoid getting
781            // these the end user should do sync writes to the journal.
782            if (LOG.isInfoEnabled()) {
783                long end = System.currentTimeMillis();
784                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
785            }
786        }
787
788        undoCounter = 0;
789        start = System.currentTimeMillis();
790
791        // Lets be extra paranoid here and verify that all the datafiles being referenced
792        // by the indexes still exists.
793
794        final SequenceSet ss = new SequenceSet();
795        for (StoredDestination sd : storedDestinations.values()) {
796            // Use a visitor to cut down the number of pages that we load
797            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
798                int last=-1;
799
800                @Override
801                public boolean isInterestedInKeysBetween(Location first, Location second) {
802                    if( first==null ) {
803                        return !ss.contains(0, second.getDataFileId());
804                    } else if( second==null ) {
805                        return true;
806                    } else {
807                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
808                    }
809                }
810
811                @Override
812                public void visit(List<Location> keys, List<Long> values) {
813                    for (Location l : keys) {
814                        int fileId = l.getDataFileId();
815                        if( last != fileId ) {
816                            ss.add(fileId);
817                            last = fileId;
818                        }
819                    }
820                }
821
822            });
823        }
824        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
825        while (!ss.isEmpty()) {
826            missingJournalFiles.add((int) ss.removeFirst());
827        }
828
829        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
830            missingJournalFiles.add(entry.getKey());
831            for (Integer i : entry.getValue()) {
832                missingJournalFiles.add(i);
833            }
834        }
835
836        missingJournalFiles.removeAll(journal.getFileMap().keySet());
837
838        if (!missingJournalFiles.isEmpty()) {
839            LOG.warn("Some journal files are missing: " + missingJournalFiles);
840        }
841
842        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
843        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
844        for (Integer missing : missingJournalFiles) {
845            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
846        }
847
848        if (checkForCorruptJournalFiles) {
849            Collection<DataFile> dataFiles = journal.getFileMap().values();
850            for (DataFile dataFile : dataFiles) {
851                int id = dataFile.getDataFileId();
852                // eof to next file id
853                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
854                Sequence seq = dataFile.getCorruptedBlocks().getHead();
855                while (seq != null) {
856                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
857                        new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
858                    missingPredicates.add(visitor);
859                    knownCorruption.add(visitor);
860                    seq = seq.getNext();
861                }
862            }
863        }
864
865        if (!missingPredicates.isEmpty()) {
866            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
867                final StoredDestination sd = sdEntry.getValue();
868                final ArrayList<Long> matches = new ArrayList<Long>();
869                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
870                    @Override
871                    protected void matched(Location key, Long value) {
872                        matches.add(value);
873                    }
874                });
875
876                // If some message references are affected by the missing data files...
877                if (!matches.isEmpty()) {
878
879                    // We either 'gracefully' recover dropping the missing messages or
880                    // we error out.
881                    if( ignoreMissingJournalfiles ) {
882                        // Update the index to remove the references to the missing data
883                        for (Long sequenceId : matches) {
884                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
885                            sd.locationIndex.remove(tx, keys.location);
886                            sd.messageIdIndex.remove(tx, keys.messageId);
887                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
888                            undoCounter++;
889                            decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
890                            // TODO: do we need to modify the ack positions for the pub sub case?
891                        }
892                    } else {
893                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations. " + matches.size() + " messages affected.");
894                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
895                    }
896                }
897            }
898        }
899
900        if (!ignoreMissingJournalfiles) {
901            if (!knownCorruption.isEmpty()) {
902                LOG.error("Detected corrupt journal files. " + knownCorruption);
903                throw new IOException("Detected corrupt journal files. " + knownCorruption);
904            }
905
906            if (!missingJournalFiles.isEmpty()) {
907                LOG.error("Detected missing journal files. " + missingJournalFiles);
908                throw new IOException("Detected missing journal files. " + missingJournalFiles);
909            }
910        }
911
912        if( undoCounter > 0 ) {
913            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
914            // should do sync writes to the journal.
915            if (LOG.isInfoEnabled()) {
916                long end = System.currentTimeMillis();
917                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
918            }
919        }
920    }
921
922    private Location nextRecoveryPosition;
923    private Location lastRecoveryPosition;
924
925    public void incrementalRecover() throws IOException {
926        this.indexLock.writeLock().lock();
927        try {
928            if( nextRecoveryPosition == null ) {
929                if( lastRecoveryPosition==null ) {
930                    nextRecoveryPosition = getRecoveryPosition();
931                } else {
932                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
933                }
934            }
935            while (nextRecoveryPosition != null) {
936                lastRecoveryPosition = nextRecoveryPosition;
937                metadata.lastUpdate = lastRecoveryPosition;
938                JournalCommand<?> message = load(lastRecoveryPosition);
939                process(message, lastRecoveryPosition, (IndexAware) null);
940                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
941            }
942        } finally {
943            this.indexLock.writeLock().unlock();
944        }
945    }
946
947    public Location getLastUpdatePosition() throws IOException {
948        return metadata.lastUpdate;
949    }
950
951    private Location getRecoveryPosition() throws IOException {
952
953        if (!this.forceRecoverIndex) {
954
955            // If we need to recover the transactions..
956            if (metadata.firstInProgressTransactionLocation != null) {
957                return metadata.firstInProgressTransactionLocation;
958            }
959
960            // Perhaps there were no transactions...
961            if( metadata.lastUpdate!=null) {
962                // Start replay at the record after the last one recorded in the index file.
963                return journal.getNextLocation(metadata.lastUpdate);
964            }
965        }
966        // This loads the first position.
967        return journal.getNextLocation(null);
968    }
969
970    protected void checkpointCleanup(final boolean cleanup) throws IOException {
971        long start;
972        this.indexLock.writeLock().lock();
973        try {
974            start = System.currentTimeMillis();
975            if( !opened.get() ) {
976                return;
977            }
978        } finally {
979            this.indexLock.writeLock().unlock();
980        }
981        checkpointUpdate(cleanup);
982        long end = System.currentTimeMillis();
983        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
984            if (LOG.isInfoEnabled()) {
985                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
986            }
987        }
988    }
989
990    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
991        int size = data.serializedSizeFramed();
992        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
993        os.writeByte(data.type().getNumber());
994        data.writeFramed(os);
995        return os.toByteSequence();
996    }
997
998    // /////////////////////////////////////////////////////////////////
999    // Methods call by the broker to update and query the store.
1000    // /////////////////////////////////////////////////////////////////
1001    public Location store(JournalCommand<?> data) throws IOException {
1002        return store(data, false, null,null);
1003    }
1004
1005    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1006        return store(data, false, null, null, onJournalStoreComplete);
1007    }
1008
1009    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1010        return store(data, sync, before, after, null);
1011    }
1012
1013    /**
1014     * All updated are are funneled through this method. The updates are converted
1015     * to a JournalMessage which is logged to the journal and then the data from
1016     * the JournalMessage is used to update the index just like it would be done
1017     * during a recovery process.
1018     */
1019    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1020        try {
1021            ByteSequence sequence = toByteSequence(data);
1022
1023            Location location;
1024            checkpointLock.readLock().lock();
1025            try {
1026
1027                long start = System.currentTimeMillis();
1028                location = onJournalStoreComplete == null ? journal.write(sequence, sync) :  journal.write(sequence, onJournalStoreComplete) ;
1029                long start2 = System.currentTimeMillis();
1030                process(data, location, before);
1031
1032                long end = System.currentTimeMillis();
1033                if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1034                    if (LOG.isInfoEnabled()) {
1035                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1036                    }
1037                }
1038
1039            } finally{
1040                checkpointLock.readLock().unlock();
1041            }
1042            if (after != null) {
1043                after.run();
1044            }
1045
1046            if (checkpointThread != null && !checkpointThread.isAlive() && opened.get()) {
1047                startCheckpoint();
1048            }
1049            return location;
1050        } catch (IOException ioe) {
1051            LOG.error("KahaDB failed to store to Journal", ioe);
1052            brokerService.handleIOException(ioe);
1053            throw ioe;
1054        }
1055    }
1056
1057    /**
1058     * Loads a previously stored JournalMessage
1059     *
1060     * @param location
1061     * @return
1062     * @throws IOException
1063     */
1064    public JournalCommand<?> load(Location location) throws IOException {
1065        long start = System.currentTimeMillis();
1066        ByteSequence data = journal.read(location);
1067        long end = System.currentTimeMillis();
1068        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1069            if (LOG.isInfoEnabled()) {
1070                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1071            }
1072        }
1073        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1074        byte readByte = is.readByte();
1075        KahaEntryType type = KahaEntryType.valueOf(readByte);
1076        if( type == null ) {
1077            try {
1078                is.close();
1079            } catch (IOException e) {}
1080            throw new IOException("Could not load journal record. Invalid location: "+location);
1081        }
1082        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1083        message.mergeFramed(is);
1084        return message;
1085    }
1086
1087    /**
1088     * do minimal recovery till we reach the last inDoubtLocation
1089     * @param data
1090     * @param location
1091     * @param inDoubtlocation
1092     * @throws IOException
1093     */
1094    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1095        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1096            process(data, location, (IndexAware) null);
1097        } else {
1098            // just recover producer audit
1099            data.visit(new Visitor() {
1100                @Override
1101                public void visit(KahaAddMessageCommand command) throws IOException {
1102                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1103                }
1104            });
1105        }
1106    }
1107
1108    // /////////////////////////////////////////////////////////////////
1109    // Journaled record processing methods. Once the record is journaled,
1110    // these methods handle applying the index updates. These may be called
1111    // from the recovery method too so they need to be idempotent
1112    // /////////////////////////////////////////////////////////////////
1113
1114    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1115        data.visit(new Visitor() {
1116            @Override
1117            public void visit(KahaAddMessageCommand command) throws IOException {
1118                process(command, location, onSequenceAssignedCallback);
1119            }
1120
1121            @Override
1122            public void visit(KahaRemoveMessageCommand command) throws IOException {
1123                process(command, location);
1124            }
1125
1126            @Override
1127            public void visit(KahaPrepareCommand command) throws IOException {
1128                process(command, location);
1129            }
1130
1131            @Override
1132            public void visit(KahaCommitCommand command) throws IOException {
1133                process(command, location, onSequenceAssignedCallback);
1134            }
1135
1136            @Override
1137            public void visit(KahaRollbackCommand command) throws IOException {
1138                process(command, location);
1139            }
1140
1141            @Override
1142            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1143                process(command, location);
1144            }
1145
1146            @Override
1147            public void visit(KahaSubscriptionCommand command) throws IOException {
1148                process(command, location);
1149            }
1150
1151            @Override
1152            public void visit(KahaProducerAuditCommand command) throws IOException {
1153                processLocation(location);
1154            }
1155
1156            @Override
1157            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1158                processLocation(location);
1159            }
1160
1161            @Override
1162            public void visit(KahaTraceCommand command) {
1163                processLocation(location);
1164            }
1165
1166            @Override
1167            public void visit(KahaUpdateMessageCommand command) throws IOException {
1168                process(command, location);
1169            }
1170        });
1171    }
1172
1173    @SuppressWarnings("rawtypes")
1174    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1175        if (command.hasTransactionInfo()) {
1176            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1177            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1178        } else {
1179            this.indexLock.writeLock().lock();
1180            try {
1181                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1182                    @Override
1183                    public void execute(Transaction tx) throws IOException {
1184                        long assignedIndex = updateIndex(tx, command, location);
1185                        if (runWithIndexLock != null) {
1186                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1187                        }
1188                    }
1189                });
1190
1191            } finally {
1192                this.indexLock.writeLock().unlock();
1193            }
1194        }
1195    }
1196
1197    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1198        this.indexLock.writeLock().lock();
1199        try {
1200            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1201                @Override
1202                public void execute(Transaction tx) throws IOException {
1203                    updateIndex(tx, command, location);
1204                }
1205            });
1206        } finally {
1207            this.indexLock.writeLock().unlock();
1208        }
1209    }
1210
1211    @SuppressWarnings("rawtypes")
1212    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1213        if (command.hasTransactionInfo()) {
1214           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1215           inflightTx.add(new RemoveOperation(command, location));
1216        } else {
1217            this.indexLock.writeLock().lock();
1218            try {
1219                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1220                    @Override
1221                    public void execute(Transaction tx) throws IOException {
1222                        updateIndex(tx, command, location);
1223                    }
1224                });
1225            } finally {
1226                this.indexLock.writeLock().unlock();
1227            }
1228        }
1229    }
1230
1231    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1232        this.indexLock.writeLock().lock();
1233        try {
1234            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1235                @Override
1236                public void execute(Transaction tx) throws IOException {
1237                    updateIndex(tx, command, location);
1238                }
1239            });
1240        } finally {
1241            this.indexLock.writeLock().unlock();
1242        }
1243    }
1244
1245    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1246        this.indexLock.writeLock().lock();
1247        try {
1248            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1249                @Override
1250                public void execute(Transaction tx) throws IOException {
1251                    updateIndex(tx, command, location);
1252                }
1253            });
1254        } finally {
1255            this.indexLock.writeLock().unlock();
1256        }
1257    }
1258
1259    protected void processLocation(final Location location) {
1260        this.indexLock.writeLock().lock();
1261        try {
1262            metadata.lastUpdate = location;
1263        } finally {
1264            this.indexLock.writeLock().unlock();
1265        }
1266    }
1267
1268    @SuppressWarnings("rawtypes")
1269    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1270        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1271        List<Operation> inflightTx;
1272        synchronized (inflightTransactions) {
1273            inflightTx = inflightTransactions.remove(key);
1274            if (inflightTx == null) {
1275                inflightTx = preparedTransactions.remove(key);
1276            }
1277        }
1278        if (inflightTx == null) {
1279            // only non persistent messages in this tx
1280            if (before != null) {
1281                before.sequenceAssignedWithIndexLocked(-1);
1282            }
1283            return;
1284        }
1285
1286        final List<Operation> messagingTx = inflightTx;
1287        indexLock.writeLock().lock();
1288        try {
1289            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1290                @Override
1291                public void execute(Transaction tx) throws IOException {
1292                    for (Operation op : messagingTx) {
1293                        op.execute(tx);
1294                    }
1295                }
1296            });
1297            metadata.lastUpdate = location;
1298        } finally {
1299            indexLock.writeLock().unlock();
1300        }
1301    }
1302
1303    @SuppressWarnings("rawtypes")
1304    protected void process(KahaPrepareCommand command, Location location) {
1305        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1306        synchronized (inflightTransactions) {
1307            List<Operation> tx = inflightTransactions.remove(key);
1308            if (tx != null) {
1309                preparedTransactions.put(key, tx);
1310            }
1311        }
1312    }
1313
1314    @SuppressWarnings("rawtypes")
1315    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1316        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1317        List<Operation> updates = null;
1318        synchronized (inflightTransactions) {
1319            updates = inflightTransactions.remove(key);
1320            if (updates == null) {
1321                updates = preparedTransactions.remove(key);
1322            }
1323        }
1324    }
1325
1326    // /////////////////////////////////////////////////////////////////
1327    // These methods do the actual index updates.
1328    // /////////////////////////////////////////////////////////////////
1329
1330    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1331    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1332
1333    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1334        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1335
1336        // Skip adding the message to the index if this is a topic and there are
1337        // no subscriptions.
1338        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1339            return -1;
1340        }
1341
1342        // Add the message.
1343        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1344        long id = sd.orderIndex.getNextMessageId();
1345        Long previous = sd.locationIndex.put(tx, location, id);
1346        if (previous == null) {
1347            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1348            if (previous == null) {
1349                incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1350                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1351                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1352                    addAckLocationForNewMessage(tx, sd, id);
1353                }
1354                metadata.lastUpdate = location;
1355            } else {
1356
1357                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1358                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1359                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1360                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1361                }
1362                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1363                sd.locationIndex.remove(tx, location);
1364                id = -1;
1365            }
1366        } else {
1367            // restore the previous value.. Looks like this was a redo of a previously
1368            // added message. We don't want to assign it a new id as the other indexes would
1369            // be wrong..
1370            sd.locationIndex.put(tx, location, previous);
1371            // ensure sequence is not broken
1372            sd.orderIndex.revertNextMessageId();
1373            metadata.lastUpdate = location;
1374        }
1375        // record this id in any event, initial send or recovery
1376        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1377
1378       return id;
1379    }
1380
1381    void trackPendingAdd(KahaDestination destination, Long seq) {
1382        StoredDestination sd = storedDestinations.get(key(destination));
1383        if (sd != null) {
1384            sd.trackPendingAdd(seq);
1385        }
1386    }
1387
1388    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1389        StoredDestination sd = storedDestinations.get(key(destination));
1390        if (sd != null) {
1391            sd.trackPendingAddComplete(seq);
1392        }
1393    }
1394
1395    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1396        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1397        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1398
1399        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1400        if (id != null) {
1401            MessageKeys previousKeys = sd.orderIndex.put(
1402                    tx,
1403                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1404                    id,
1405                    new MessageKeys(command.getMessageId(), location)
1406            );
1407            sd.locationIndex.put(tx, location, id);
1408            incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1409            // on first update previous is original location, on recovery/replay it may be the updated location
1410            if(previousKeys != null && !previousKeys.location.equals(location)) {
1411                sd.locationIndex.remove(tx, previousKeys.location);
1412                decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
1413            }
1414            metadata.lastUpdate = location;
1415        } else {
1416            //Add the message if it can't be found
1417            this.updateIndex(tx, command, location);
1418        }
1419    }
1420
1421    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1422        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1423        if (!command.hasSubscriptionKey()) {
1424
1425            // In the queue case we just remove the message from the index..
1426            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1427            if (sequenceId != null) {
1428                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1429                if (keys != null) {
1430                    sd.locationIndex.remove(tx, keys.location);
1431                    decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
1432                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1433                    metadata.lastUpdate = ackLocation;
1434                }  else if (LOG.isDebugEnabled()) {
1435                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1436                }
1437            } else if (LOG.isDebugEnabled()) {
1438                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1439            }
1440        } else {
1441            // In the topic case we need remove the message once it's been acked
1442            // by all the subs
1443            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1444
1445            // Make sure it's a valid message id...
1446            if (sequence != null) {
1447                String subscriptionKey = command.getSubscriptionKey();
1448                if (command.getAck() != UNMATCHED) {
1449                    sd.orderIndex.get(tx, sequence);
1450                    byte priority = sd.orderIndex.lastGetPriority();
1451                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1452                }
1453
1454                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1455                if (keys != null) {
1456                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1457                }
1458                // The following method handles deleting un-referenced messages.
1459                removeAckLocation(command, tx, sd, subscriptionKey, sequence);
1460                metadata.lastUpdate = ackLocation;
1461            } else if (LOG.isDebugEnabled()) {
1462                LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1463            }
1464
1465        }
1466    }
1467
1468    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1469        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1470        if (referenceFileIds == null) {
1471            referenceFileIds = new HashSet<Integer>();
1472            referenceFileIds.add(messageLocation.getDataFileId());
1473            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1474        } else {
1475            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1476            if (!referenceFileIds.contains(id)) {
1477                referenceFileIds.add(id);
1478            }
1479        }
1480    }
1481
1482    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1483        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1484        sd.orderIndex.remove(tx);
1485
1486        sd.locationIndex.clear(tx);
1487        sd.locationIndex.unload(tx);
1488        tx.free(sd.locationIndex.getPageId());
1489
1490        sd.messageIdIndex.clear(tx);
1491        sd.messageIdIndex.unload(tx);
1492        tx.free(sd.messageIdIndex.getPageId());
1493
1494        if (sd.subscriptions != null) {
1495            sd.subscriptions.clear(tx);
1496            sd.subscriptions.unload(tx);
1497            tx.free(sd.subscriptions.getPageId());
1498
1499            sd.subscriptionAcks.clear(tx);
1500            sd.subscriptionAcks.unload(tx);
1501            tx.free(sd.subscriptionAcks.getPageId());
1502
1503            sd.ackPositions.clear(tx);
1504            sd.ackPositions.unload(tx);
1505            tx.free(sd.ackPositions.getHeadPageId());
1506
1507            sd.subLocations.clear(tx);
1508            sd.subLocations.unload(tx);
1509            tx.free(sd.subLocations.getHeadPageId());
1510        }
1511
1512        String key = key(command.getDestination());
1513        storedDestinations.remove(key);
1514        metadata.destinations.remove(tx, key);
1515        clearStoreStats(command.getDestination());
1516        storeCache.remove(key(command.getDestination()));
1517    }
1518
1519    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1520        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1521        final String subscriptionKey = command.getSubscriptionKey();
1522
1523        // If set then we are creating it.. otherwise we are destroying the sub
1524        if (command.hasSubscriptionInfo()) {
1525            Location existing = sd.subLocations.get(tx, subscriptionKey);
1526            if (existing != null && existing.compareTo(location) == 0) {
1527                // replay on recovery, ignore
1528                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1529                return;
1530            }
1531
1532            sd.subscriptions.put(tx, subscriptionKey, command);
1533            sd.subLocations.put(tx, subscriptionKey, location);
1534            long ackLocation=NOT_ACKED;
1535            if (!command.getRetroactive()) {
1536                ackLocation = sd.orderIndex.nextMessageId-1;
1537            } else {
1538                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1539            }
1540            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1541            sd.subscriptionCache.add(subscriptionKey);
1542        } else {
1543            // delete the sub...
1544            sd.subscriptions.remove(tx, subscriptionKey);
1545            sd.subLocations.remove(tx, subscriptionKey);
1546            sd.subscriptionAcks.remove(tx, subscriptionKey);
1547            sd.subscriptionCache.remove(subscriptionKey);
1548            removeAckLocationsForSub(command, tx, sd, subscriptionKey);
1549
1550            if (sd.subscriptions.isEmpty(tx)) {
1551                // remove the stored destination
1552                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1553                removeDestinationCommand.setDestination(command.getDestination());
1554                updateIndex(tx, removeDestinationCommand, null);
1555                clearStoreStats(command.getDestination());
1556            }
1557        }
1558    }
1559
1560    private void checkpointUpdate(final boolean cleanup) throws IOException {
1561        checkpointLock.writeLock().lock();
1562        try {
1563            this.indexLock.writeLock().lock();
1564            try {
1565                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1566                    @Override
1567                    public void execute(Transaction tx) throws IOException {
1568                        checkpointUpdate(tx, cleanup);
1569                    }
1570                });
1571            } finally {
1572                this.indexLock.writeLock().unlock();
1573            }
1574
1575        } finally {
1576            checkpointLock.writeLock().unlock();
1577        }
1578    }
1579
1580    /**
1581     * @param tx
1582     * @throws IOException
1583     */
1584    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1585        LOG.debug("Checkpoint started.");
1586
1587        // reflect last update exclusive of current checkpoint
1588        Location lastUpdate = metadata.lastUpdate;
1589
1590        metadata.state = OPEN_STATE;
1591        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1592        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1593        Location[] inProgressTxRange = getInProgressTxLocationRange();
1594        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1595        tx.store(metadata.page, metadataMarshaller, true);
1596        pageFile.flush();
1597
1598        if( cleanup ) {
1599
1600            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1601            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1602
1603            if (LOG.isTraceEnabled()) {
1604                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1605            }
1606
1607            if (lastUpdate != null) {
1608                gcCandidateSet.remove(lastUpdate.getDataFileId());
1609            }
1610
1611            // Don't GC files under replication
1612            if( journalFilesBeingReplicated!=null ) {
1613                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1614            }
1615
1616            if (metadata.producerSequenceIdTrackerLocation != null) {
1617                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1618                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1619                    // rewrite so we don't prevent gc
1620                    metadata.producerSequenceIdTracker.setModified(true);
1621                    if (LOG.isTraceEnabled()) {
1622                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1623                    }
1624                }
1625                gcCandidateSet.remove(dataFileId);
1626                if (LOG.isTraceEnabled()) {
1627                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet);
1628                }
1629            }
1630
1631            if (metadata.ackMessageFileMapLocation != null) {
1632                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1633                gcCandidateSet.remove(dataFileId);
1634                if (LOG.isTraceEnabled()) {
1635                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet);
1636                }
1637            }
1638
1639            // Don't GC files referenced by in-progress tx
1640            if (inProgressTxRange[0] != null) {
1641                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1642                    gcCandidateSet.remove(pendingTx);
1643                }
1644            }
1645            if (LOG.isTraceEnabled()) {
1646                LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1647            }
1648
1649            // Go through all the destinations to see if any of them can remove GC candidates.
1650            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1651                if( gcCandidateSet.isEmpty() ) {
1652                    break;
1653                }
1654
1655                // Use a visitor to cut down the number of pages that we load
1656                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1657                    int last=-1;
1658                    @Override
1659                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1660                        if( first==null ) {
1661                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1662                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1663                                subset.remove(second.getDataFileId());
1664                            }
1665                            return !subset.isEmpty();
1666                        } else if( second==null ) {
1667                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1668                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1669                                subset.remove(first.getDataFileId());
1670                            }
1671                            return !subset.isEmpty();
1672                        } else {
1673                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1674                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1675                                subset.remove(first.getDataFileId());
1676                            }
1677                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1678                                subset.remove(second.getDataFileId());
1679                            }
1680                            return !subset.isEmpty();
1681                        }
1682                    }
1683
1684                    @Override
1685                    public void visit(List<Location> keys, List<Long> values) {
1686                        for (Location l : keys) {
1687                            int fileId = l.getDataFileId();
1688                            if( last != fileId ) {
1689                                gcCandidateSet.remove(fileId);
1690                                last = fileId;
1691                            }
1692                        }
1693                    }
1694                });
1695
1696                // Durable Subscription
1697                if (entry.getValue().subLocations != null) {
1698                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1699                    while (iter.hasNext()) {
1700                        Entry<String, Location> subscription = iter.next();
1701                        int dataFileId = subscription.getValue().getDataFileId();
1702
1703                        // Move subscription along if it has no outstanding messages that need ack'd
1704                        // and its in the last log file in the journal.
1705                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1706                            final StoredDestination destination = entry.getValue();
1707                            final String subscriptionKey = subscription.getKey();
1708                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1709
1710                            // When pending is size one that is the next message Id meaning there
1711                            // are no pending messages currently.
1712                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1713                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1714
1715                                if (LOG.isTraceEnabled()) {
1716                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
1717                                }
1718
1719                                final KahaSubscriptionCommand kahaSub =
1720                                    destination.subscriptions.get(tx, subscriptionKey);
1721                                destination.subLocations.put(
1722                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1723
1724                                // Skips the remove from candidates if we rewrote the subscription
1725                                // in order to prevent duplicate subscription commands on recover.
1726                                // If another subscription is on the same file and isn't rewritten
1727                                // than it will remove the file from the set.
1728                                continue;
1729                            }
1730                        }
1731
1732                        gcCandidateSet.remove(dataFileId);
1733                    }
1734                }
1735
1736                if (LOG.isTraceEnabled()) {
1737                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1738                }
1739            }
1740
1741            // check we are not deleting file with ack for in-use journal files
1742            if (LOG.isTraceEnabled()) {
1743                LOG.trace("gc candidates: " + gcCandidateSet);
1744                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1745            }
1746            boolean ackMessageFileMapMod = false;
1747            Iterator<Integer> candidates = gcCandidateSet.iterator();
1748            while (candidates.hasNext()) {
1749                Integer candidate = candidates.next();
1750                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1751                if (referencedFileIds != null) {
1752                    for (Integer referencedFileId : referencedFileIds) {
1753                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1754                            // active file that is not targeted for deletion is referenced so don't delete
1755                            candidates.remove();
1756                            break;
1757                        }
1758                    }
1759                    if (gcCandidateSet.contains(candidate)) {
1760                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1761                    } else {
1762                        if (LOG.isTraceEnabled()) {
1763                            LOG.trace("not removing data file: " + candidate
1764                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1765                        }
1766                    }
1767                }
1768            }
1769
1770            if (!gcCandidateSet.isEmpty()) {
1771                if (LOG.isDebugEnabled()) {
1772                    LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
1773                }
1774                journal.removeDataFiles(gcCandidateSet);
1775                for (Integer candidate : gcCandidateSet) {
1776                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1777                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1778                    }
1779                }
1780                if (ackMessageFileMapMod) {
1781                    checkpointUpdate(tx, false);
1782                }
1783            }
1784        }
1785
1786        LOG.debug("Checkpoint done.");
1787    }
1788
1789    final Runnable nullCompletionCallback = new Runnable() {
1790        @Override
1791        public void run() {
1792        }
1793    };
1794
1795    private Location checkpointProducerAudit() throws IOException {
1796        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
1797            ByteArrayOutputStream baos = new ByteArrayOutputStream();
1798            ObjectOutputStream oout = new ObjectOutputStream(baos);
1799            oout.writeObject(metadata.producerSequenceIdTracker);
1800            oout.flush();
1801            oout.close();
1802            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1803            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
1804            try {
1805                location.getLatch().await();
1806            } catch (InterruptedException e) {
1807                throw new InterruptedIOException(e.toString());
1808            }
1809            return location;
1810        }
1811        return metadata.producerSequenceIdTrackerLocation;
1812    }
1813
1814    private Location checkpointAckMessageFileMap() throws IOException {
1815        ByteArrayOutputStream baos = new ByteArrayOutputStream();
1816        ObjectOutputStream oout = new ObjectOutputStream(baos);
1817        oout.writeObject(metadata.ackMessageFileMap);
1818        oout.flush();
1819        oout.close();
1820        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1821        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
1822        try {
1823            location.getLatch().await();
1824        } catch (InterruptedException e) {
1825            throw new InterruptedIOException(e.toString());
1826        }
1827        return location;
1828    }
1829
1830    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
1831
1832        ByteSequence sequence = toByteSequence(subscription);
1833        Location location = journal.write(sequence, nullCompletionCallback) ;
1834
1835        try {
1836            location.getLatch().await();
1837        } catch (InterruptedException e) {
1838            throw new InterruptedIOException(e.toString());
1839        }
1840        return location;
1841    }
1842
1843    public HashSet<Integer> getJournalFilesBeingReplicated() {
1844        return journalFilesBeingReplicated;
1845    }
1846
1847    // /////////////////////////////////////////////////////////////////
1848    // StoredDestination related implementation methods.
1849    // /////////////////////////////////////////////////////////////////
1850
1851    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1852
1853    static class MessageKeys {
1854        final String messageId;
1855        final Location location;
1856
1857        public MessageKeys(String messageId, Location location) {
1858            this.messageId=messageId;
1859            this.location=location;
1860        }
1861
1862        @Override
1863        public String toString() {
1864            return "["+messageId+","+location+"]";
1865        }
1866    }
1867
1868    protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1869        final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller();
1870
1871        @Override
1872        public MessageKeys readPayload(DataInput dataIn) throws IOException {
1873            return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn));
1874        }
1875
1876        @Override
1877        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1878            dataOut.writeUTF(object.messageId);
1879            locationSizeMarshaller.writePayload(object.location, dataOut);
1880        }
1881    }
1882
1883    class LastAck {
1884        long lastAckedSequence;
1885        byte priority;
1886
1887        public LastAck(LastAck source) {
1888            this.lastAckedSequence = source.lastAckedSequence;
1889            this.priority = source.priority;
1890        }
1891
1892        public LastAck() {
1893            this.priority = MessageOrderIndex.HI;
1894        }
1895
1896        public LastAck(long ackLocation) {
1897            this.lastAckedSequence = ackLocation;
1898            this.priority = MessageOrderIndex.LO;
1899        }
1900
1901        public LastAck(long ackLocation, byte priority) {
1902            this.lastAckedSequence = ackLocation;
1903            this.priority = priority;
1904        }
1905
1906        @Override
1907        public String toString() {
1908            return "[" + lastAckedSequence + ":" + priority + "]";
1909        }
1910    }
1911
1912    protected class LastAckMarshaller implements Marshaller<LastAck> {
1913
1914        @Override
1915        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1916            dataOut.writeLong(object.lastAckedSequence);
1917            dataOut.writeByte(object.priority);
1918        }
1919
1920        @Override
1921        public LastAck readPayload(DataInput dataIn) throws IOException {
1922            LastAck lastAcked = new LastAck();
1923            lastAcked.lastAckedSequence = dataIn.readLong();
1924            if (metadata.version >= 3) {
1925                lastAcked.priority = dataIn.readByte();
1926            }
1927            return lastAcked;
1928        }
1929
1930        @Override
1931        public int getFixedSize() {
1932            return 9;
1933        }
1934
1935        @Override
1936        public LastAck deepCopy(LastAck source) {
1937            return new LastAck(source);
1938        }
1939
1940        @Override
1941        public boolean isDeepCopySupported() {
1942            return true;
1943        }
1944    }
1945
1946
1947    class StoredDestination {
1948
1949        MessageOrderIndex orderIndex = new MessageOrderIndex();
1950        BTreeIndex<Location, Long> locationIndex;
1951        BTreeIndex<String, Long> messageIdIndex;
1952
1953        // These bits are only set for Topics
1954        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1955        BTreeIndex<String, LastAck> subscriptionAcks;
1956        HashMap<String, MessageOrderCursor> subscriptionCursors;
1957        ListIndex<String, SequenceSet> ackPositions;
1958        ListIndex<String, Location> subLocations;
1959
1960        // Transient data used to track which Messages are no longer needed.
1961        final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
1962        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
1963
1964        public void trackPendingAdd(Long seq) {
1965            orderIndex.trackPendingAdd(seq);
1966        }
1967
1968        public void trackPendingAddComplete(Long seq) {
1969            orderIndex.trackPendingAddComplete(seq);
1970        }
1971
1972        @Override
1973        public String toString() {
1974            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
1975        }
1976    }
1977
1978    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1979
1980        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
1981
1982        @Override
1983        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
1984            final StoredDestination value = new StoredDestination();
1985            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1986            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1987            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1988
1989            if (dataIn.readBoolean()) {
1990                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1991                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1992                if (metadata.version >= 4) {
1993                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
1994                } else {
1995                    // upgrade
1996                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1997                        @Override
1998                        public void execute(Transaction tx) throws IOException {
1999                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
2000
2001                            if (metadata.version >= 3) {
2002                                // migrate
2003                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2004                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
2005                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2006                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2007                                oldAckPositions.load(tx);
2008
2009
2010                                // Do the initial build of the data in memory before writing into the store
2011                                // based Ack Positions List to avoid a lot of disk thrashing.
2012                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2013                                while (iterator.hasNext()) {
2014                                    Entry<Long, HashSet<String>> entry = iterator.next();
2015
2016                                    for(String subKey : entry.getValue()) {
2017                                        SequenceSet pendingAcks = temp.get(subKey);
2018                                        if (pendingAcks == null) {
2019                                            pendingAcks = new SequenceSet();
2020                                            temp.put(subKey, pendingAcks);
2021                                        }
2022
2023                                        pendingAcks.add(entry.getKey());
2024                                    }
2025                                }
2026                            }
2027                            // Now move the pending messages to ack data into the store backed
2028                            // structure.
2029                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2030                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2031                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2032                            value.ackPositions.load(tx);
2033                            for(String subscriptionKey : temp.keySet()) {
2034                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2035                            }
2036
2037                        }
2038                    });
2039                }
2040
2041                if (metadata.version >= 5) {
2042                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
2043                } else {
2044                    // upgrade
2045                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2046                        @Override
2047                        public void execute(Transaction tx) throws IOException {
2048                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2049                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2050                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2051                            value.subLocations.load(tx);
2052                        }
2053                    });
2054                }
2055            }
2056            if (metadata.version >= 2) {
2057                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2058                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2059            } else {
2060                // upgrade
2061                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2062                    @Override
2063                    public void execute(Transaction tx) throws IOException {
2064                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2065                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2066                        value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2067                        value.orderIndex.lowPriorityIndex.load(tx);
2068
2069                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2070                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2071                        value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2072                        value.orderIndex.highPriorityIndex.load(tx);
2073                    }
2074                });
2075            }
2076
2077            return value;
2078        }
2079
2080        @Override
2081        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2082            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2083            dataOut.writeLong(value.locationIndex.getPageId());
2084            dataOut.writeLong(value.messageIdIndex.getPageId());
2085            if (value.subscriptions != null) {
2086                dataOut.writeBoolean(true);
2087                dataOut.writeLong(value.subscriptions.getPageId());
2088                dataOut.writeLong(value.subscriptionAcks.getPageId());
2089                dataOut.writeLong(value.ackPositions.getHeadPageId());
2090                dataOut.writeLong(value.subLocations.getHeadPageId());
2091            } else {
2092                dataOut.writeBoolean(false);
2093            }
2094            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2095            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2096        }
2097    }
2098
2099    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2100        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2101
2102        @Override
2103        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2104            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2105            rc.mergeFramed((InputStream)dataIn);
2106            return rc;
2107        }
2108
2109        @Override
2110        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2111            object.writeFramed((OutputStream)dataOut);
2112        }
2113    }
2114
2115    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2116        String key = key(destination);
2117        StoredDestination rc = storedDestinations.get(key);
2118        if (rc == null) {
2119            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2120            rc = loadStoredDestination(tx, key, topic);
2121            // Cache it. We may want to remove/unload destinations from the
2122            // cache that are not used for a while
2123            // to reduce memory usage.
2124            storedDestinations.put(key, rc);
2125        }
2126        return rc;
2127    }
2128
2129    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2130        String key = key(destination);
2131        StoredDestination rc = storedDestinations.get(key);
2132        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2133            rc = getStoredDestination(destination, tx);
2134        }
2135        return rc;
2136    }
2137
2138    /**
2139     * @param tx
2140     * @param key
2141     * @param topic
2142     * @return
2143     * @throws IOException
2144     */
2145    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2146        // Try to load the existing indexes..
2147        StoredDestination rc = metadata.destinations.get(tx, key);
2148        if (rc == null) {
2149            // Brand new destination.. allocate indexes for it.
2150            rc = new StoredDestination();
2151            rc.orderIndex.allocate(tx);
2152            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
2153            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
2154
2155            if (topic) {
2156                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
2157                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
2158                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2159                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2160            }
2161            metadata.destinations.put(tx, key, rc);
2162        }
2163
2164        // Configure the marshalers and load.
2165        rc.orderIndex.load(tx);
2166
2167        // Figure out the next key using the last entry in the destination.
2168        rc.orderIndex.configureLast(tx);
2169
2170        rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
2171        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2172        rc.locationIndex.load(tx);
2173
2174        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2175        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2176        rc.messageIdIndex.load(tx);
2177
2178        //go through an upgrade old index if older than version 6
2179        if (metadata.version < 6) {
2180            for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) {
2181                Entry<Location, Long> entry = iterator.next();
2182                // modify so it is upgraded
2183                rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
2184            }
2185            //upgrade the order index
2186            for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) {
2187                Entry<Long, MessageKeys> entry = iterator.next();
2188                //call get so that the last priority is updated
2189                rc.orderIndex.get(tx, entry.getKey());
2190                rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue());
2191            }
2192        }
2193
2194        // If it was a topic...
2195        if (topic) {
2196
2197            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2198            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2199            rc.subscriptions.load(tx);
2200
2201            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2202            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2203            rc.subscriptionAcks.load(tx);
2204
2205            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2206            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2207            rc.ackPositions.load(tx);
2208
2209            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2210            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2211            rc.subLocations.load(tx);
2212
2213            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
2214
2215            if (metadata.version < 3) {
2216
2217                // on upgrade need to fill ackLocation with available messages past last ack
2218                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2219                    Entry<String, LastAck> entry = iterator.next();
2220                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2221                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2222                        Long sequence = orderIterator.next().getKey();
2223                        addAckLocation(tx, rc, sequence, entry.getKey());
2224                    }
2225                    // modify so it is upgraded
2226                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2227                }
2228            }
2229
2230            // Configure the message references index
2231            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2232            while (subscriptions.hasNext()) {
2233                Entry<String, SequenceSet> subscription = subscriptions.next();
2234                SequenceSet pendingAcks = subscription.getValue();
2235                if (pendingAcks != null && !pendingAcks.isEmpty()) {
2236                    Long lastPendingAck = pendingAcks.getTail().getLast();
2237                    for (Long sequenceId : pendingAcks) {
2238                        Long current = rc.messageReferences.get(sequenceId);
2239                        if (current == null) {
2240                            current = new Long(0);
2241                        }
2242
2243                        // We always add a trailing empty entry for the next position to start from
2244                        // so we need to ensure we don't count that as a message reference on reload.
2245                        if (!sequenceId.equals(lastPendingAck)) {
2246                            current = current.longValue() + 1;
2247                        } else {
2248                            current = Long.valueOf(0L);
2249                        }
2250
2251                        rc.messageReferences.put(sequenceId, current);
2252                    }
2253                }
2254            }
2255
2256            // Configure the subscription cache
2257            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2258                Entry<String, LastAck> entry = iterator.next();
2259                rc.subscriptionCache.add(entry.getKey());
2260            }
2261
2262            if (rc.orderIndex.nextMessageId == 0) {
2263                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2264                if (!rc.subscriptionAcks.isEmpty(tx)) {
2265                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2266                        Entry<String, LastAck> entry = iterator.next();
2267                        rc.orderIndex.nextMessageId =
2268                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2269                    }
2270                }
2271            } else {
2272                // update based on ackPositions for unmatched, last entry is always the next
2273                if (!rc.messageReferences.isEmpty()) {
2274                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
2275                    rc.orderIndex.nextMessageId =
2276                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
2277                }
2278            }
2279        }
2280
2281        if (metadata.version < VERSION) {
2282            // store again after upgrade
2283            metadata.destinations.put(tx, key, rc);
2284        }
2285        return rc;
2286    }
2287
2288    /**
2289     * Clear the counter for the destination, if one exists.
2290     *
2291     * @param kahaDestination
2292     */
2293    protected void clearStoreStats(KahaDestination kahaDestination) {
2294        MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination));
2295        if (storeStats != null) {
2296            storeStats.reset();
2297        }
2298    }
2299
2300    /**
2301     * Update MessageStoreStatistics
2302     *
2303     * @param kahaDestination
2304     * @param size
2305     */
2306    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
2307        incrementAndAddSizeToStoreStat(key(kahaDestination), size);
2308    }
2309
2310    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
2311        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2312        if (storeStats != null) {
2313            storeStats.getMessageCount().increment();
2314            if (size > 0) {
2315                storeStats.getMessageSize().addSize(size);
2316            }
2317        }
2318    }
2319
2320    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
2321        decrementAndSubSizeToStoreStat(key(kahaDestination), size);
2322    }
2323
2324    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
2325        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2326        if (storeStats != null) {
2327            storeStats.getMessageCount().decrement();
2328            if (size > 0) {
2329                storeStats.getMessageSize().addSize(-size);
2330            }
2331        }
2332    }
2333
2334    /**
2335     * This is a map to cache DestinationStatistics for a specific
2336     * KahaDestination key
2337     */
2338    protected final ConcurrentMap<String, MessageStore> storeCache =
2339            new ConcurrentHashMap<String, MessageStore>();
2340
2341    /**
2342     * Locate the storeMessageSize counter for this KahaDestination
2343     * @param kahaDestination
2344     * @return
2345     */
2346    protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
2347        MessageStoreStatistics storeStats = null;
2348        try {
2349            MessageStore messageStore = storeCache.get(kahaDestKey);
2350            if (messageStore != null) {
2351                storeStats = messageStore.getMessageStoreStatistics();
2352            }
2353        } catch (Exception e1) {
2354             LOG.error("Getting size counter of destination failed", e1);
2355        }
2356
2357        return storeStats;
2358    }
2359
2360    /**
2361     * Determine whether this Destination matches the DestinationType
2362     *
2363     * @param destination
2364     * @param type
2365     * @return
2366     */
2367    protected boolean matchType(Destination destination,
2368            KahaDestination.DestinationType type) {
2369        if (destination instanceof Topic
2370                && type.equals(KahaDestination.DestinationType.TOPIC)) {
2371            return true;
2372        } else if (destination instanceof Queue
2373                && type.equals(KahaDestination.DestinationType.QUEUE)) {
2374            return true;
2375        }
2376        return false;
2377    }
2378
2379    class LocationSizeMarshaller implements Marshaller<Location> {
2380
2381        public LocationSizeMarshaller() {
2382
2383        }
2384
2385        @Override
2386        public Location readPayload(DataInput dataIn) throws IOException {
2387            Location rc = new Location();
2388            rc.setDataFileId(dataIn.readInt());
2389            rc.setOffset(dataIn.readInt());
2390            if (metadata.version >= 6) {
2391                rc.setSize(dataIn.readInt());
2392            }
2393            return rc;
2394        }
2395
2396        @Override
2397        public void writePayload(Location object, DataOutput dataOut)
2398                throws IOException {
2399            dataOut.writeInt(object.getDataFileId());
2400            dataOut.writeInt(object.getOffset());
2401            dataOut.writeInt(object.getSize());
2402        }
2403
2404        @Override
2405        public int getFixedSize() {
2406            return 12;
2407        }
2408
2409        @Override
2410        public Location deepCopy(Location source) {
2411            return new Location(source);
2412        }
2413
2414        @Override
2415        public boolean isDeepCopySupported() {
2416            return true;
2417        }
2418    }
2419
2420    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2421        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2422        if (sequences == null) {
2423            sequences = new SequenceSet();
2424            sequences.add(messageSequence);
2425            sd.ackPositions.add(tx, subscriptionKey, sequences);
2426        } else {
2427            sequences.add(messageSequence);
2428            sd.ackPositions.put(tx, subscriptionKey, sequences);
2429        }
2430
2431        Long count = sd.messageReferences.get(messageSequence);
2432        if (count == null) {
2433            count = Long.valueOf(0L);
2434        }
2435        count = count.longValue() + 1;
2436        sd.messageReferences.put(messageSequence, count);
2437    }
2438
2439    // new sub is interested in potentially all existing messages
2440    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2441        SequenceSet allOutstanding = new SequenceSet();
2442        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2443        while (iterator.hasNext()) {
2444            SequenceSet set = iterator.next().getValue();
2445            for (Long entry : set) {
2446                allOutstanding.add(entry);
2447            }
2448        }
2449        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2450
2451        for (Long ackPosition : allOutstanding) {
2452            Long count = sd.messageReferences.get(ackPosition);
2453
2454            // There might not be a reference if the ackLocation was the last
2455            // one which is a placeholder for the next incoming message and
2456            // no value was added to the message references table.
2457            if (count != null) {
2458                count = count.longValue() + 1;
2459                sd.messageReferences.put(ackPosition, count);
2460            }
2461        }
2462    }
2463
2464    // on a new message add, all existing subs are interested in this message
2465    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
2466        for(String subscriptionKey : sd.subscriptionCache) {
2467            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2468            if (sequences == null) {
2469                sequences = new SequenceSet();
2470                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2471                sd.ackPositions.add(tx, subscriptionKey, sequences);
2472            } else {
2473                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2474                sd.ackPositions.put(tx, subscriptionKey, sequences);
2475            }
2476
2477            Long count = sd.messageReferences.get(messageSequence);
2478            if (count == null) {
2479                count = Long.valueOf(0L);
2480            }
2481            count = count.longValue() + 1;
2482            sd.messageReferences.put(messageSequence, count);
2483            sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
2484        }
2485    }
2486
2487    private void removeAckLocationsForSub(KahaSubscriptionCommand command,
2488            Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2489        if (!sd.ackPositions.isEmpty(tx)) {
2490            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2491            if (sequences == null || sequences.isEmpty()) {
2492                return;
2493            }
2494
2495            ArrayList<Long> unreferenced = new ArrayList<Long>();
2496
2497            for(Long sequenceId : sequences) {
2498                Long references = sd.messageReferences.get(sequenceId);
2499                if (references != null) {
2500                    references = references.longValue() - 1;
2501
2502                    if (references.longValue() > 0) {
2503                        sd.messageReferences.put(sequenceId, references);
2504                    } else {
2505                        sd.messageReferences.remove(sequenceId);
2506                        unreferenced.add(sequenceId);
2507                    }
2508                }
2509            }
2510
2511            for(Long sequenceId : unreferenced) {
2512                // Find all the entries that need to get deleted.
2513                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2514                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2515
2516                // Do the actual deletes.
2517                for (Entry<Long, MessageKeys> entry : deletes) {
2518                    sd.locationIndex.remove(tx, entry.getValue().location);
2519                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2520                    sd.orderIndex.remove(tx, entry.getKey());
2521                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2522                }
2523            }
2524        }
2525    }
2526
2527    /**
2528     * @param tx
2529     * @param sd
2530     * @param subscriptionKey
2531     * @param messageSequence
2532     * @throws IOException
2533     */
2534    private void removeAckLocation(KahaRemoveMessageCommand command,
2535            Transaction tx, StoredDestination sd, String subscriptionKey,
2536            Long messageSequence) throws IOException {
2537        // Remove the sub from the previous location set..
2538        if (messageSequence != null) {
2539            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2540            if (range != null && !range.isEmpty()) {
2541                range.remove(messageSequence);
2542                if (!range.isEmpty()) {
2543                    sd.ackPositions.put(tx, subscriptionKey, range);
2544                } else {
2545                    sd.ackPositions.remove(tx, subscriptionKey);
2546                }
2547
2548                // Check if the message is reference by any other subscription.
2549                Long count = sd.messageReferences.get(messageSequence);
2550                if (count != null) {
2551                    long references = count.longValue() - 1;
2552                    if (references > 0) {
2553                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
2554                        return;
2555                    } else {
2556                        sd.messageReferences.remove(messageSequence);
2557                    }
2558                }
2559
2560                // Find all the entries that need to get deleted.
2561                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2562                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2563
2564                // Do the actual deletes.
2565                for (Entry<Long, MessageKeys> entry : deletes) {
2566                    sd.locationIndex.remove(tx, entry.getValue().location);
2567                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2568                    sd.orderIndex.remove(tx, entry.getKey());
2569                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2570                }
2571            }
2572        }
2573    }
2574
2575    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2576        return sd.subscriptionAcks.get(tx, subscriptionKey);
2577    }
2578
2579    public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2580        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2581        if (messageSequences != null) {
2582            long result = messageSequences.rangeSize();
2583            // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2584            return result > 0 ? result - 1 : 0;
2585        }
2586
2587        return 0;
2588    }
2589
2590    public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2591        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2592        long locationSize = 0;
2593        if (messageSequences != null) {
2594            Iterator<Long> sequences = messageSequences.iterator();
2595
2596            while (sequences.hasNext()) {
2597                Long sequenceId = sequences.next();
2598                //the last item is the next marker
2599                if (!sequences.hasNext()) {
2600                    break;
2601                }
2602                Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx);
2603                while (iterator.hasNext()) {
2604                    Entry<Location, Long> entry = iterator.next();
2605                    if (entry.getValue() == sequenceId - 1) {
2606                        locationSize += entry.getKey().getSize();
2607                        break;
2608                    }
2609
2610                }
2611            }
2612        }
2613
2614        return locationSize;
2615    }
2616    protected String key(KahaDestination destination) {
2617        return destination.getType().getNumber() + ":" + destination.getName();
2618    }
2619
2620    // /////////////////////////////////////////////////////////////////
2621    // Transaction related implementation methods.
2622    // /////////////////////////////////////////////////////////////////
2623    @SuppressWarnings("rawtypes")
2624    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2625    @SuppressWarnings("rawtypes")
2626    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2627    protected final Set<String> ackedAndPrepared = new HashSet<String>();
2628    protected final Set<String> rolledBackAcks = new HashSet<String>();
2629
2630    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2631    // till then they are skipped by the store.
2632    // 'at most once' XA guarantee
2633    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2634        this.indexLock.writeLock().lock();
2635        try {
2636            for (MessageAck ack : acks) {
2637                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
2638            }
2639        } finally {
2640            this.indexLock.writeLock().unlock();
2641        }
2642    }
2643
2644    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
2645        if (acks != null) {
2646            this.indexLock.writeLock().lock();
2647            try {
2648                for (MessageAck ack : acks) {
2649                    final String id = ack.getLastMessageId().toProducerKey();
2650                    ackedAndPrepared.remove(id);
2651                    if (rollback) {
2652                        rolledBackAcks.add(id);
2653                    }
2654                }
2655            } finally {
2656                this.indexLock.writeLock().unlock();
2657            }
2658        }
2659    }
2660
2661    @SuppressWarnings("rawtypes")
2662    private List<Operation> getInflightTx(KahaTransactionInfo info) {
2663        TransactionId key = TransactionIdConversion.convert(info);
2664        List<Operation> tx;
2665        synchronized (inflightTransactions) {
2666            tx = inflightTransactions.get(key);
2667            if (tx == null) {
2668                tx = Collections.synchronizedList(new ArrayList<Operation>());
2669                inflightTransactions.put(key, tx);
2670            }
2671        }
2672        return tx;
2673    }
2674
2675    @SuppressWarnings("unused")
2676    private TransactionId key(KahaTransactionInfo transactionInfo) {
2677        return TransactionIdConversion.convert(transactionInfo);
2678    }
2679
2680    abstract class Operation <T extends JournalCommand<T>> {
2681        final T command;
2682        final Location location;
2683
2684        public Operation(T command, Location location) {
2685            this.command = command;
2686            this.location = location;
2687        }
2688
2689        public Location getLocation() {
2690            return location;
2691        }
2692
2693        public T getCommand() {
2694            return command;
2695        }
2696
2697        abstract public void execute(Transaction tx) throws IOException;
2698    }
2699
2700    class AddOperation extends Operation<KahaAddMessageCommand> {
2701        final IndexAware runWithIndexLock;
2702        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
2703            super(command, location);
2704            this.runWithIndexLock = runWithIndexLock;
2705        }
2706
2707        @Override
2708        public void execute(Transaction tx) throws IOException {
2709            long seq = updateIndex(tx, command, location);
2710            if (runWithIndexLock != null) {
2711                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
2712            }
2713        }
2714
2715    }
2716
2717    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
2718
2719        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
2720            super(command, location);
2721        }
2722
2723        @Override
2724        public void execute(Transaction tx) throws IOException {
2725            updateIndex(tx, command, location);
2726        }
2727    }
2728
2729    // /////////////////////////////////////////////////////////////////
2730    // Initialization related implementation methods.
2731    // /////////////////////////////////////////////////////////////////
2732
2733    private PageFile createPageFile() throws IOException {
2734        if( indexDirectory == null ) {
2735            indexDirectory = directory;
2736        }
2737        IOHelper.mkdirs(indexDirectory);
2738        PageFile index = new PageFile(indexDirectory, "db");
2739        index.setEnableWriteThread(isEnableIndexWriteAsync());
2740        index.setWriteBatchSize(getIndexWriteBatchSize());
2741        index.setPageCacheSize(indexCacheSize);
2742        index.setUseLFRUEviction(isUseIndexLFRUEviction());
2743        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2744        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2745        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2746        index.setEnablePageCaching(isEnableIndexPageCaching());
2747        return index;
2748    }
2749
2750    private Journal createJournal() throws IOException {
2751        Journal manager = new Journal();
2752        manager.setDirectory(directory);
2753        manager.setMaxFileLength(getJournalMaxFileLength());
2754        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2755        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2756        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2757        manager.setArchiveDataLogs(isArchiveDataLogs());
2758        manager.setSizeAccumulator(journalSize);
2759        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2760        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
2761        manager.setPreallocationStrategy(
2762                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
2763        if (getDirectoryArchive() != null) {
2764            IOHelper.mkdirs(getDirectoryArchive());
2765            manager.setDirectoryArchive(getDirectoryArchive());
2766        }
2767        return manager;
2768    }
2769
2770    private Metadata createMetadata() {
2771        Metadata md = new Metadata();
2772        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
2773        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
2774        return md;
2775    }
2776
2777    protected abstract void configureMetadata();
2778
2779    public int getJournalMaxWriteBatchSize() {
2780        return journalMaxWriteBatchSize;
2781    }
2782
2783    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2784        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2785    }
2786
2787    public File getDirectory() {
2788        return directory;
2789    }
2790
2791    public void setDirectory(File directory) {
2792        this.directory = directory;
2793    }
2794
2795    public boolean isDeleteAllMessages() {
2796        return deleteAllMessages;
2797    }
2798
2799    public void setDeleteAllMessages(boolean deleteAllMessages) {
2800        this.deleteAllMessages = deleteAllMessages;
2801    }
2802
2803    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2804        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2805    }
2806
2807    public int getIndexWriteBatchSize() {
2808        return setIndexWriteBatchSize;
2809    }
2810
2811    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2812        this.enableIndexWriteAsync = enableIndexWriteAsync;
2813    }
2814
2815    boolean isEnableIndexWriteAsync() {
2816        return enableIndexWriteAsync;
2817    }
2818
2819    public boolean isEnableJournalDiskSyncs() {
2820        return enableJournalDiskSyncs;
2821    }
2822
2823    public void setEnableJournalDiskSyncs(boolean syncWrites) {
2824        this.enableJournalDiskSyncs = syncWrites;
2825    }
2826
2827    public long getCheckpointInterval() {
2828        return checkpointInterval;
2829    }
2830
2831    public void setCheckpointInterval(long checkpointInterval) {
2832        this.checkpointInterval = checkpointInterval;
2833    }
2834
2835    public long getCleanupInterval() {
2836        return cleanupInterval;
2837    }
2838
2839    public void setCleanupInterval(long cleanupInterval) {
2840        this.cleanupInterval = cleanupInterval;
2841    }
2842
2843    public void setJournalMaxFileLength(int journalMaxFileLength) {
2844        this.journalMaxFileLength = journalMaxFileLength;
2845    }
2846
2847    public int getJournalMaxFileLength() {
2848        return journalMaxFileLength;
2849    }
2850
2851    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
2852        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
2853    }
2854
2855    public int getMaxFailoverProducersToTrack() {
2856        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
2857    }
2858
2859    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
2860        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
2861    }
2862
2863    public int getFailoverProducersAuditDepth() {
2864        return this.metadata.producerSequenceIdTracker.getAuditDepth();
2865    }
2866
2867    public PageFile getPageFile() throws IOException {
2868        if (pageFile == null) {
2869            pageFile = createPageFile();
2870        }
2871        return pageFile;
2872    }
2873
2874    public Journal getJournal() throws IOException {
2875        if (journal == null) {
2876            journal = createJournal();
2877        }
2878        return journal;
2879    }
2880
2881    public boolean isFailIfDatabaseIsLocked() {
2882        return failIfDatabaseIsLocked;
2883    }
2884
2885    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
2886        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
2887    }
2888
2889    public boolean isIgnoreMissingJournalfiles() {
2890        return ignoreMissingJournalfiles;
2891    }
2892
2893    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
2894        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
2895    }
2896
2897    public int getIndexCacheSize() {
2898        return indexCacheSize;
2899    }
2900
2901    public void setIndexCacheSize(int indexCacheSize) {
2902        this.indexCacheSize = indexCacheSize;
2903    }
2904
2905    public boolean isCheckForCorruptJournalFiles() {
2906        return checkForCorruptJournalFiles;
2907    }
2908
2909    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
2910        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
2911    }
2912
2913    public boolean isChecksumJournalFiles() {
2914        return checksumJournalFiles;
2915    }
2916
2917    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
2918        this.checksumJournalFiles = checksumJournalFiles;
2919    }
2920
2921    @Override
2922    public void setBrokerService(BrokerService brokerService) {
2923        this.brokerService = brokerService;
2924    }
2925
2926    /**
2927     * @return the archiveDataLogs
2928     */
2929    public boolean isArchiveDataLogs() {
2930        return this.archiveDataLogs;
2931    }
2932
2933    /**
2934     * @param archiveDataLogs the archiveDataLogs to set
2935     */
2936    public void setArchiveDataLogs(boolean archiveDataLogs) {
2937        this.archiveDataLogs = archiveDataLogs;
2938    }
2939
2940    /**
2941     * @return the directoryArchive
2942     */
2943    public File getDirectoryArchive() {
2944        return this.directoryArchive;
2945    }
2946
2947    /**
2948     * @param directoryArchive the directoryArchive to set
2949     */
2950    public void setDirectoryArchive(File directoryArchive) {
2951        this.directoryArchive = directoryArchive;
2952    }
2953
2954    public boolean isArchiveCorruptedIndex() {
2955        return archiveCorruptedIndex;
2956    }
2957
2958    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
2959        this.archiveCorruptedIndex = archiveCorruptedIndex;
2960    }
2961
2962    public float getIndexLFUEvictionFactor() {
2963        return indexLFUEvictionFactor;
2964    }
2965
2966    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
2967        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
2968    }
2969
2970    public boolean isUseIndexLFRUEviction() {
2971        return useIndexLFRUEviction;
2972    }
2973
2974    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
2975        this.useIndexLFRUEviction = useIndexLFRUEviction;
2976    }
2977
2978    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
2979        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
2980    }
2981
2982    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
2983        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
2984    }
2985
2986    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
2987        this.enableIndexPageCaching = enableIndexPageCaching;
2988    }
2989
2990    public boolean isEnableIndexDiskSyncs() {
2991        return enableIndexDiskSyncs;
2992    }
2993
2994    public boolean isEnableIndexRecoveryFile() {
2995        return enableIndexRecoveryFile;
2996    }
2997
2998    public boolean isEnableIndexPageCaching() {
2999        return enableIndexPageCaching;
3000    }
3001
3002    // /////////////////////////////////////////////////////////////////
3003    // Internal conversion methods.
3004    // /////////////////////////////////////////////////////////////////
3005
3006    class MessageOrderCursor{
3007        long defaultCursorPosition;
3008        long lowPriorityCursorPosition;
3009        long highPriorityCursorPosition;
3010        MessageOrderCursor(){
3011        }
3012
3013        MessageOrderCursor(long position){
3014            this.defaultCursorPosition=position;
3015            this.lowPriorityCursorPosition=position;
3016            this.highPriorityCursorPosition=position;
3017        }
3018
3019        MessageOrderCursor(MessageOrderCursor other){
3020            this.defaultCursorPosition=other.defaultCursorPosition;
3021            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3022            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3023        }
3024
3025        MessageOrderCursor copy() {
3026            return new MessageOrderCursor(this);
3027        }
3028
3029        void reset() {
3030            this.defaultCursorPosition=0;
3031            this.highPriorityCursorPosition=0;
3032            this.lowPriorityCursorPosition=0;
3033        }
3034
3035        void increment() {
3036            if (defaultCursorPosition!=0) {
3037                defaultCursorPosition++;
3038            }
3039            if (highPriorityCursorPosition!=0) {
3040                highPriorityCursorPosition++;
3041            }
3042            if (lowPriorityCursorPosition!=0) {
3043                lowPriorityCursorPosition++;
3044            }
3045        }
3046
3047        @Override
3048        public String toString() {
3049           return "MessageOrderCursor:[def:" + defaultCursorPosition
3050                   + ", low:" + lowPriorityCursorPosition
3051                   + ", high:" +  highPriorityCursorPosition + "]";
3052        }
3053
3054        public void sync(MessageOrderCursor other) {
3055            this.defaultCursorPosition=other.defaultCursorPosition;
3056            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3057            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3058        }
3059    }
3060
3061    class MessageOrderIndex {
3062        static final byte HI = 9;
3063        static final byte LO = 0;
3064        static final byte DEF = 4;
3065
3066        long nextMessageId;
3067        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3068        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3069        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3070        final MessageOrderCursor cursor = new MessageOrderCursor();
3071        Long lastDefaultKey;
3072        Long lastHighKey;
3073        Long lastLowKey;
3074        byte lastGetPriority;
3075        final List<Long> pendingAdditions = new LinkedList<Long>();
3076        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
3077
3078        MessageKeys remove(Transaction tx, Long key) throws IOException {
3079            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3080            if (result == null && highPriorityIndex!=null) {
3081                result = highPriorityIndex.remove(tx, key);
3082                if (result ==null && lowPriorityIndex!=null) {
3083                    result = lowPriorityIndex.remove(tx, key);
3084                }
3085            }
3086            return result;
3087        }
3088
3089        void load(Transaction tx) throws IOException {
3090            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3091            defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3092            defaultPriorityIndex.load(tx);
3093            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3094            lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3095            lowPriorityIndex.load(tx);
3096            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3097            highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3098            highPriorityIndex.load(tx);
3099        }
3100
3101        void allocate(Transaction tx) throws IOException {
3102            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3103            if (metadata.version >= 2) {
3104                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3105                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3106            }
3107        }
3108
3109        void configureLast(Transaction tx) throws IOException {
3110            // Figure out the next key using the last entry in the destination.
3111            TreeSet<Long> orderedSet = new TreeSet<Long>();
3112
3113            addLast(orderedSet, highPriorityIndex, tx);
3114            addLast(orderedSet, defaultPriorityIndex, tx);
3115            addLast(orderedSet, lowPriorityIndex, tx);
3116
3117            if (!orderedSet.isEmpty()) {
3118                nextMessageId = orderedSet.last() + 1;
3119            }
3120        }
3121
3122        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3123            if (index != null) {
3124                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3125                if (lastEntry != null) {
3126                    orderedSet.add(lastEntry.getKey());
3127                }
3128            }
3129        }
3130
3131        void clear(Transaction tx) throws IOException {
3132            this.remove(tx);
3133            this.resetCursorPosition();
3134            this.allocate(tx);
3135            this.load(tx);
3136            this.configureLast(tx);
3137        }
3138
3139        void remove(Transaction tx) throws IOException {
3140            defaultPriorityIndex.clear(tx);
3141            defaultPriorityIndex.unload(tx);
3142            tx.free(defaultPriorityIndex.getPageId());
3143            if (lowPriorityIndex != null) {
3144                lowPriorityIndex.clear(tx);
3145                lowPriorityIndex.unload(tx);
3146
3147                tx.free(lowPriorityIndex.getPageId());
3148            }
3149            if (highPriorityIndex != null) {
3150                highPriorityIndex.clear(tx);
3151                highPriorityIndex.unload(tx);
3152                tx.free(highPriorityIndex.getPageId());
3153            }
3154        }
3155
3156        void resetCursorPosition() {
3157            this.cursor.reset();
3158            lastDefaultKey = null;
3159            lastHighKey = null;
3160            lastLowKey = null;
3161        }
3162
3163        void setBatch(Transaction tx, Long sequence) throws IOException {
3164            if (sequence != null) {
3165                Long nextPosition = new Long(sequence.longValue() + 1);
3166                lastDefaultKey = sequence;
3167                cursor.defaultCursorPosition = nextPosition.longValue();
3168                lastHighKey = sequence;
3169                cursor.highPriorityCursorPosition = nextPosition.longValue();
3170                lastLowKey = sequence;
3171                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3172            }
3173        }
3174
3175        void setBatch(Transaction tx, LastAck last) throws IOException {
3176            setBatch(tx, last.lastAckedSequence);
3177            if (cursor.defaultCursorPosition == 0
3178                    && cursor.highPriorityCursorPosition == 0
3179                    && cursor.lowPriorityCursorPosition == 0) {
3180                long next = last.lastAckedSequence + 1;
3181                switch (last.priority) {
3182                    case DEF:
3183                        cursor.defaultCursorPosition = next;
3184                        cursor.highPriorityCursorPosition = next;
3185                        break;
3186                    case HI:
3187                        cursor.highPriorityCursorPosition = next;
3188                        break;
3189                    case LO:
3190                        cursor.lowPriorityCursorPosition = next;
3191                        cursor.defaultCursorPosition = next;
3192                        cursor.highPriorityCursorPosition = next;
3193                        break;
3194                }
3195            }
3196        }
3197
3198        void stoppedIterating() {
3199            if (lastDefaultKey!=null) {
3200                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3201            }
3202            if (lastHighKey!=null) {
3203                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3204            }
3205            if (lastLowKey!=null) {
3206                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3207            }
3208            lastDefaultKey = null;
3209            lastHighKey = null;
3210            lastLowKey = null;
3211        }
3212
3213        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3214                throws IOException {
3215            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3216                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3217            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3218                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3219            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3220                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3221            }
3222        }
3223
3224        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3225                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3226
3227            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3228            deletes.add(iterator.next());
3229        }
3230
3231        long getNextMessageId() {
3232            return nextMessageId++;
3233        }
3234
3235        void revertNextMessageId() {
3236            nextMessageId--;
3237        }
3238
3239        MessageKeys get(Transaction tx, Long key) throws IOException {
3240            MessageKeys result = defaultPriorityIndex.get(tx, key);
3241            if (result == null) {
3242                result = highPriorityIndex.get(tx, key);
3243                if (result == null) {
3244                    result = lowPriorityIndex.get(tx, key);
3245                    lastGetPriority = LO;
3246                } else {
3247                    lastGetPriority = HI;
3248                }
3249            } else {
3250                lastGetPriority = DEF;
3251            }
3252            return result;
3253        }
3254
3255        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3256            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3257                return defaultPriorityIndex.put(tx, key, value);
3258            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3259                return highPriorityIndex.put(tx, key, value);
3260            } else {
3261                return lowPriorityIndex.put(tx, key, value);
3262            }
3263        }
3264
3265        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3266            return new MessageOrderIterator(tx,cursor,this);
3267        }
3268
3269        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3270            return new MessageOrderIterator(tx,m,this);
3271        }
3272
3273        public byte lastGetPriority() {
3274            return lastGetPriority;
3275        }
3276
3277        public boolean alreadyDispatched(Long sequence) {
3278            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3279                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3280                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3281        }
3282
3283        public void trackPendingAdd(Long seq) {
3284            synchronized (pendingAdditions) {
3285                pendingAdditions.add(seq);
3286            }
3287        }
3288
3289        public void trackPendingAddComplete(Long seq) {
3290            synchronized (pendingAdditions) {
3291                pendingAdditions.remove(seq);
3292            }
3293        }
3294
3295        public Long minPendingAdd() {
3296            synchronized (pendingAdditions) {
3297                if (!pendingAdditions.isEmpty()) {
3298                    return pendingAdditions.get(0);
3299                } else {
3300                    return null;
3301                }
3302            }
3303        }
3304
3305        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3306            Iterator<Entry<Long, MessageKeys>>currentIterator;
3307            final Iterator<Entry<Long, MessageKeys>>highIterator;
3308            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3309            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3310
3311            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3312                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3313                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3314                if (highPriorityIndex != null) {
3315                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3316                } else {
3317                    this.highIterator = null;
3318                }
3319                if (lowPriorityIndex != null) {
3320                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3321                } else {
3322                    this.lowIterator = null;
3323                }
3324            }
3325
3326            @Override
3327            public boolean hasNext() {
3328                if (currentIterator == null) {
3329                    if (highIterator != null) {
3330                        if (highIterator.hasNext()) {
3331                            currentIterator = highIterator;
3332                            return currentIterator.hasNext();
3333                        }
3334                        if (defaultIterator.hasNext()) {
3335                            currentIterator = defaultIterator;
3336                            return currentIterator.hasNext();
3337                        }
3338                        if (lowIterator.hasNext()) {
3339                            currentIterator = lowIterator;
3340                            return currentIterator.hasNext();
3341                        }
3342                        return false;
3343                    } else {
3344                        currentIterator = defaultIterator;
3345                        return currentIterator.hasNext();
3346                    }
3347                }
3348                if (highIterator != null) {
3349                    if (currentIterator.hasNext()) {
3350                        return true;
3351                    }
3352                    if (currentIterator == highIterator) {
3353                        if (defaultIterator.hasNext()) {
3354                            currentIterator = defaultIterator;
3355                            return currentIterator.hasNext();
3356                        }
3357                        if (lowIterator.hasNext()) {
3358                            currentIterator = lowIterator;
3359                            return currentIterator.hasNext();
3360                        }
3361                        return false;
3362                    }
3363
3364                    if (currentIterator == defaultIterator) {
3365                        if (lowIterator.hasNext()) {
3366                            currentIterator = lowIterator;
3367                            return currentIterator.hasNext();
3368                        }
3369                        return false;
3370                    }
3371                }
3372                return currentIterator.hasNext();
3373            }
3374
3375            @Override
3376            public Entry<Long, MessageKeys> next() {
3377                Entry<Long, MessageKeys> result = currentIterator.next();
3378                if (result != null) {
3379                    Long key = result.getKey();
3380                    if (highIterator != null) {
3381                        if (currentIterator == defaultIterator) {
3382                            lastDefaultKey = key;
3383                        } else if (currentIterator == highIterator) {
3384                            lastHighKey = key;
3385                        } else {
3386                            lastLowKey = key;
3387                        }
3388                    } else {
3389                        lastDefaultKey = key;
3390                    }
3391                }
3392                return result;
3393            }
3394
3395            @Override
3396            public void remove() {
3397                throw new UnsupportedOperationException();
3398            }
3399        }
3400    }
3401
3402    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3403        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3404
3405        @Override
3406        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3407            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3408            ObjectOutputStream oout = new ObjectOutputStream(baos);
3409            oout.writeObject(object);
3410            oout.flush();
3411            oout.close();
3412            byte[] data = baos.toByteArray();
3413            dataOut.writeInt(data.length);
3414            dataOut.write(data);
3415        }
3416
3417        @Override
3418        @SuppressWarnings("unchecked")
3419        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3420            int dataLen = dataIn.readInt();
3421            byte[] data = new byte[dataLen];
3422            dataIn.readFully(data);
3423            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3424            ObjectInputStream oin = new ObjectInputStream(bais);
3425            try {
3426                return (HashSet<String>) oin.readObject();
3427            } catch (ClassNotFoundException cfe) {
3428                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3429                ioe.initCause(cfe);
3430                throw ioe;
3431            }
3432        }
3433    }
3434
3435    public File getIndexDirectory() {
3436        return indexDirectory;
3437    }
3438
3439    public void setIndexDirectory(File indexDirectory) {
3440        this.indexDirectory = indexDirectory;
3441    }
3442
3443    interface IndexAware {
3444        public void sequenceAssignedWithIndexLocked(long index);
3445    }
3446
3447    public String getPreallocationScope() {
3448        return preallocationScope;
3449    }
3450
3451    public void setPreallocationScope(String preallocationScope) {
3452        this.preallocationScope = preallocationScope;
3453    }
3454
3455    public String getPreallocationStrategy() {
3456        return preallocationStrategy;
3457    }
3458
3459    public void setPreallocationStrategy(String preallocationStrategy) {
3460        this.preallocationStrategy = preallocationStrategy;
3461    }
3462}