001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInput;
022    import java.io.DataOutput;
023    import java.io.EOFException;
024    import java.io.File;
025    import java.io.IOException;
026    import java.io.InputStream;
027    import java.io.InterruptedIOException;
028    import java.io.ObjectInputStream;
029    import java.io.ObjectOutputStream;
030    import java.io.OutputStream;
031    import java.util.ArrayList;
032    import java.util.Arrays;
033    import java.util.Collection;
034    import java.util.Collections;
035    import java.util.Date;
036    import java.util.HashMap;
037    import java.util.HashSet;
038    import java.util.Iterator;
039    import java.util.LinkedHashMap;
040    import java.util.LinkedHashSet;
041    import java.util.List;
042    import java.util.Map;
043    import java.util.Map.Entry;
044    import java.util.Set;
045    import java.util.SortedSet;
046    import java.util.Stack;
047    import java.util.TreeMap;
048    import java.util.TreeSet;
049    import java.util.concurrent.atomic.AtomicBoolean;
050    import java.util.concurrent.atomic.AtomicLong;
051    import java.util.concurrent.locks.ReentrantReadWriteLock;
052    
053    import org.apache.activemq.ActiveMQMessageAuditNoSync;
054    import org.apache.activemq.broker.BrokerService;
055    import org.apache.activemq.broker.BrokerServiceAware;
056    import org.apache.activemq.command.MessageAck;
057    import org.apache.activemq.command.SubscriptionInfo;
058    import org.apache.activemq.command.TransactionId;
059    import org.apache.activemq.protobuf.Buffer;
060    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
061    import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
062    import org.apache.activemq.store.kahadb.data.KahaDestination;
063    import org.apache.activemq.store.kahadb.data.KahaEntryType;
064    import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
065    import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
066    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
067    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
068    import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
069    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
070    import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
071    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
072    import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
073    import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
074    import org.apache.activemq.store.kahadb.disk.index.ListIndex;
075    import org.apache.activemq.store.kahadb.disk.journal.DataFile;
076    import org.apache.activemq.store.kahadb.disk.journal.Journal;
077    import org.apache.activemq.store.kahadb.disk.journal.Location;
078    import org.apache.activemq.store.kahadb.disk.page.Page;
079    import org.apache.activemq.store.kahadb.disk.page.PageFile;
080    import org.apache.activemq.store.kahadb.disk.page.Transaction;
081    import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
082    import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
083    import org.apache.activemq.store.kahadb.disk.util.Marshaller;
084    import org.apache.activemq.store.kahadb.disk.util.Sequence;
085    import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
086    import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
087    import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
088    import org.apache.activemq.util.ByteSequence;
089    import org.apache.activemq.util.Callback;
090    import org.apache.activemq.util.DataByteArrayInputStream;
091    import org.apache.activemq.util.DataByteArrayOutputStream;
092    import org.apache.activemq.util.IOHelper;
093    import org.apache.activemq.util.ServiceStopper;
094    import org.apache.activemq.util.ServiceSupport;
095    import org.slf4j.Logger;
096    import org.slf4j.LoggerFactory;
097    
098    public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
099    
100        protected BrokerService brokerService;
101    
102        public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
103        public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
104        public static final File DEFAULT_DIRECTORY = new File("KahaDB");
105        protected static final Buffer UNMATCHED;
106        static {
107            UNMATCHED = new Buffer(new byte[]{});
108        }
109        private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
110        private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
111    
112        static final int CLOSED_STATE = 1;
113        static final int OPEN_STATE = 2;
114        static final long NOT_ACKED = -1;
115    
116        static final int VERSION = 4;
117    
118        protected class Metadata {
119            protected Page<Metadata> page;
120            protected int state;
121            protected BTreeIndex<String, StoredDestination> destinations;
122            protected Location lastUpdate;
123            protected Location firstInProgressTransactionLocation;
124            protected Location producerSequenceIdTrackerLocation = null;
125            protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
126            protected int version = VERSION;
127            public void read(DataInput is) throws IOException {
128                state = is.readInt();
129                destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
130                if (is.readBoolean()) {
131                    lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
132                } else {
133                    lastUpdate = null;
134                }
135                if (is.readBoolean()) {
136                    firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
137                } else {
138                    firstInProgressTransactionLocation = null;
139                }
140                try {
141                    if (is.readBoolean()) {
142                        producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
143                    } else {
144                        producerSequenceIdTrackerLocation = null;
145                    }
146                } catch (EOFException expectedOnUpgrade) {
147                }
148                try {
149                   version = is.readInt();
150                } catch (EOFException expectedOnUpgrade) {
151                    version=1;
152                }
153                LOG.info("KahaDB is version " + version);
154            }
155    
156            public void write(DataOutput os) throws IOException {
157                os.writeInt(state);
158                os.writeLong(destinations.getPageId());
159    
160                if (lastUpdate != null) {
161                    os.writeBoolean(true);
162                    LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
163                } else {
164                    os.writeBoolean(false);
165                }
166    
167                if (firstInProgressTransactionLocation != null) {
168                    os.writeBoolean(true);
169                    LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
170                } else {
171                    os.writeBoolean(false);
172                }
173    
174                if (producerSequenceIdTrackerLocation != null) {
175                    os.writeBoolean(true);
176                    LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
177                } else {
178                    os.writeBoolean(false);
179                }
180                os.writeInt(VERSION);
181            }
182        }
183    
184        class MetadataMarshaller extends VariableMarshaller<Metadata> {
185            @Override
186            public Metadata readPayload(DataInput dataIn) throws IOException {
187                Metadata rc = new Metadata();
188                rc.read(dataIn);
189                return rc;
190            }
191    
192            @Override
193            public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
194                object.write(dataOut);
195            }
196        }
197    
198        protected PageFile pageFile;
199        protected Journal journal;
200        protected Metadata metadata = new Metadata();
201    
202        protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
203    
204        protected boolean failIfDatabaseIsLocked;
205    
206        protected boolean deleteAllMessages;
207        protected File directory = DEFAULT_DIRECTORY;
208        protected Thread checkpointThread;
209        protected boolean enableJournalDiskSyncs=true;
210        protected boolean archiveDataLogs;
211        protected File directoryArchive;
212        protected AtomicLong journalSize = new AtomicLong(0);
213        long checkpointInterval = 5*1000;
214        long cleanupInterval = 30*1000;
215        int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
216        int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
217        boolean enableIndexWriteAsync = false;
218        int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
219    
220        protected AtomicBoolean opened = new AtomicBoolean();
221        private boolean ignoreMissingJournalfiles = false;
222        private int indexCacheSize = 10000;
223        private boolean checkForCorruptJournalFiles = false;
224        private boolean checksumJournalFiles = false;
225        protected boolean forceRecoverIndex = false;
226        private final Object checkpointThreadLock = new Object();
227        private boolean rewriteOnRedelivery = false;
228        private boolean archiveCorruptedIndex = false;
229        private boolean useIndexLFRUEviction = false;
230        private float indexLFUEvictionFactor = 0.2f;
231        private boolean enableIndexDiskSyncs = true;
232        private boolean enableIndexRecoveryFile = true;
233        private boolean enableIndexPageCaching = true;
234    
235        public MessageDatabase() {
236        }
237    
238        @Override
239        public void doStart() throws Exception {
240            load();
241        }
242    
243        @Override
244        public void doStop(ServiceStopper stopper) throws Exception {
245            unload();
246        }
247    
248        private void loadPageFile() throws IOException {
249            this.indexLock.writeLock().lock();
250            try {
251                final PageFile pageFile = getPageFile();
252                pageFile.load();
253                pageFile.tx().execute(new Transaction.Closure<IOException>() {
254                    @Override
255                    public void execute(Transaction tx) throws IOException {
256                        if (pageFile.getPageCount() == 0) {
257                            // First time this is created.. Initialize the metadata
258                            Page<Metadata> page = tx.allocate();
259                            assert page.getPageId() == 0;
260                            page.set(metadata);
261                            metadata.page = page;
262                            metadata.state = CLOSED_STATE;
263                            metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
264    
265                            tx.store(metadata.page, metadataMarshaller, true);
266                        } else {
267                            Page<Metadata> page = tx.load(0, metadataMarshaller);
268                            metadata = page.get();
269                            metadata.page = page;
270                        }
271                        metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
272                        metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
273                        metadata.destinations.load(tx);
274                    }
275                });
276                // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
277                // Perhaps we should just keep an index of file
278                storedDestinations.clear();
279                pageFile.tx().execute(new Transaction.Closure<IOException>() {
280                    @Override
281                    public void execute(Transaction tx) throws IOException {
282                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
283                            Entry<String, StoredDestination> entry = iterator.next();
284                            StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
285                            storedDestinations.put(entry.getKey(), sd);
286                        }
287                    }
288                });
289                pageFile.flush();
290            } finally {
291                this.indexLock.writeLock().unlock();
292            }
293        }
294    
295        private void startCheckpoint() {
296            if (checkpointInterval == 0 &&  cleanupInterval == 0) {
297                LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
298                return;
299            }
300            synchronized (checkpointThreadLock) {
301                boolean start = false;
302                if (checkpointThread == null) {
303                    start = true;
304                } else if (!checkpointThread.isAlive()) {
305                    start = true;
306                    LOG.info("KahaDB: Recovering checkpoint thread after death");
307                }
308                if (start) {
309                    checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
310                        @Override
311                        public void run() {
312                            try {
313                                long lastCleanup = System.currentTimeMillis();
314                                long lastCheckpoint = System.currentTimeMillis();
315                                // Sleep for a short time so we can periodically check
316                                // to see if we need to exit this thread.
317                                long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
318                                while (opened.get()) {
319                                    Thread.sleep(sleepTime);
320                                    long now = System.currentTimeMillis();
321                                    if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
322                                        checkpointCleanup(true);
323                                        lastCleanup = now;
324                                        lastCheckpoint = now;
325                                    } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
326                                        checkpointCleanup(false);
327                                        lastCheckpoint = now;
328                                    }
329                                }
330                            } catch (InterruptedException e) {
331                                // Looks like someone really wants us to exit this thread...
332                            } catch (IOException ioe) {
333                                LOG.error("Checkpoint failed", ioe);
334                                brokerService.handleIOException(ioe);
335                            }
336                        }
337                    };
338    
339                    checkpointThread.setDaemon(true);
340                    checkpointThread.start();
341                }
342            }
343        }
344    
345        public void open() throws IOException {
346            if( opened.compareAndSet(false, true) ) {
347                getJournal().start();
348                try {
349                    loadPageFile();
350                } catch (Throwable t) {
351                    LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
352                    if (LOG.isDebugEnabled()) {
353                        LOG.debug("Index load failure", t);
354                    }
355                    // try to recover index
356                    try {
357                        pageFile.unload();
358                    } catch (Exception ignore) {}
359                    if (archiveCorruptedIndex) {
360                        pageFile.archive();
361                    } else {
362                        pageFile.delete();
363                    }
364                    metadata = new Metadata();
365                    pageFile = null;
366                    loadPageFile();
367                }
368                startCheckpoint();
369                recover();
370            }
371        }
372    
373        public void load() throws IOException {
374            this.indexLock.writeLock().lock();
375            IOHelper.mkdirs(directory);
376            try {
377                if (deleteAllMessages) {
378                    getJournal().start();
379                    getJournal().delete();
380                    getJournal().close();
381                    journal = null;
382                    getPageFile().delete();
383                    LOG.info("Persistence store purged.");
384                    deleteAllMessages = false;
385                }
386    
387                open();
388                store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
389            } finally {
390                this.indexLock.writeLock().unlock();
391            }
392        }
393    
394        public void close() throws IOException, InterruptedException {
395            if( opened.compareAndSet(true, false)) {
396                this.indexLock.writeLock().lock();
397                try {
398                    if (metadata.page != null) {
399                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
400                            @Override
401                            public void execute(Transaction tx) throws IOException {
402                                checkpointUpdate(tx, true);
403                            }
404                        });
405                    }
406                    pageFile.unload();
407                    metadata = new Metadata();
408                } finally {
409                    this.indexLock.writeLock().unlock();
410                }
411                journal.close();
412                synchronized (checkpointThreadLock) {
413                    if (checkpointThread != null) {
414                        checkpointThread.join();
415                    }
416                }
417            }
418        }
419    
420        public void unload() throws IOException, InterruptedException {
421            this.indexLock.writeLock().lock();
422            try {
423                if( pageFile != null && pageFile.isLoaded() ) {
424                    metadata.state = CLOSED_STATE;
425                    metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
426    
427                    if (metadata.page != null) {
428                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
429                            @Override
430                            public void execute(Transaction tx) throws IOException {
431                                tx.store(metadata.page, metadataMarshaller, true);
432                            }
433                        });
434                    }
435                }
436            } finally {
437                this.indexLock.writeLock().unlock();
438            }
439            close();
440        }
441    
442        // public for testing
443        @SuppressWarnings("rawtypes")
444        public Location[] getInProgressTxLocationRange() {
445            Location[] range = new Location[]{null, null};
446            synchronized (inflightTransactions) {
447                if (!inflightTransactions.isEmpty()) {
448                    for (List<Operation> ops : inflightTransactions.values()) {
449                        if (!ops.isEmpty()) {
450                            trackMaxAndMin(range, ops);
451                        }
452                    }
453                }
454                if (!preparedTransactions.isEmpty()) {
455                    for (List<Operation> ops : preparedTransactions.values()) {
456                        if (!ops.isEmpty()) {
457                            trackMaxAndMin(range, ops);
458                        }
459                    }
460                }
461            }
462            return range;
463        }
464    
465        private void trackMaxAndMin(Location[] range, List<Operation> ops) {
466            Location t = ops.get(0).getLocation();
467            if (range[0]==null || t.compareTo(range[0]) <= 0) {
468                range[0] = t;
469            }
470            t = ops.get(ops.size() -1).getLocation();
471            if (range[1]==null || t.compareTo(range[1]) >= 0) {
472                range[1] = t;
473            }
474        }
475    
476        class TranInfo {
477            TransactionId id;
478            Location location;
479    
480            class opCount {
481                int add;
482                int remove;
483            }
484            HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
485    
486            public void track(Operation operation) {
487                if (location == null ) {
488                    location = operation.getLocation();
489                }
490                KahaDestination destination;
491                boolean isAdd = false;
492                if (operation instanceof AddOpperation) {
493                    AddOpperation add = (AddOpperation) operation;
494                    destination = add.getCommand().getDestination();
495                    isAdd = true;
496                } else {
497                    RemoveOpperation removeOpperation = (RemoveOpperation) operation;
498                    destination = removeOpperation.getCommand().getDestination();
499                }
500                opCount opCount = destinationOpCount.get(destination);
501                if (opCount == null) {
502                    opCount = new opCount();
503                    destinationOpCount.put(destination, opCount);
504                }
505                if (isAdd) {
506                    opCount.add++;
507                } else {
508                    opCount.remove++;
509                }
510            }
511    
512            @Override
513            public String toString() {
514               StringBuffer buffer = new StringBuffer();
515               buffer.append(location).append(";").append(id).append(";\n");
516               for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
517                   buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
518               }
519               return buffer.toString();
520            }
521        }
522    
523        @SuppressWarnings("rawtypes")
524        public String getTransactions() {
525    
526            ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
527            synchronized (inflightTransactions) {
528                if (!inflightTransactions.isEmpty()) {
529                    for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
530                        TranInfo info = new TranInfo();
531                        info.id = entry.getKey();
532                        for (Operation operation : entry.getValue()) {
533                            info.track(operation);
534                        }
535                        infos.add(info);
536                    }
537                }
538            }
539            return infos.toString();
540        }
541    
542        /**
543         * Move all the messages that were in the journal into long term storage. We
544         * just replay and do a checkpoint.
545         *
546         * @throws IOException
547         * @throws IOException
548         * @throws IllegalStateException
549         */
550        private void recover() throws IllegalStateException, IOException {
551            this.indexLock.writeLock().lock();
552            try {
553    
554                long start = System.currentTimeMillis();
555                Location producerAuditPosition = recoverProducerAudit();
556                Location lastIndoubtPosition = getRecoveryPosition();
557    
558                Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
559    
560                if (recoveryPosition != null) {
561                    int redoCounter = 0;
562                    LOG.info("Recovering from the journal ...");
563                    while (recoveryPosition != null) {
564                        JournalCommand<?> message = load(recoveryPosition);
565                        metadata.lastUpdate = recoveryPosition;
566                        process(message, recoveryPosition, lastIndoubtPosition);
567                        redoCounter++;
568                        recoveryPosition = journal.getNextLocation(recoveryPosition);
569                         if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
570                             LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries recovered ..");
571                         }
572                    }
573                    if (LOG.isInfoEnabled()) {
574                        long end = System.currentTimeMillis();
575                        LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
576                    }
577                }
578    
579                // We may have to undo some index updates.
580                pageFile.tx().execute(new Transaction.Closure<IOException>() {
581                    @Override
582                    public void execute(Transaction tx) throws IOException {
583                        recoverIndex(tx);
584                    }
585                });
586    
587                // rollback any recovered inflight local transactions
588                Set<TransactionId> toRollback = new HashSet<TransactionId>();
589                synchronized (inflightTransactions) {
590                    for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
591                        TransactionId id = it.next();
592                        if (id.isLocalTransaction()) {
593                            toRollback.add(id);
594                        }
595                    }
596                    for (TransactionId tx: toRollback) {
597                        if (LOG.isDebugEnabled()) {
598                            LOG.debug("rolling back recovered indoubt local transaction " + tx);
599                        }
600                        store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
601                    }
602                }
603            } finally {
604                this.indexLock.writeLock().unlock();
605            }
606        }
607    
608        @SuppressWarnings("unused")
609        private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
610            return TransactionIdConversion.convertToLocal(tx);
611        }
612    
613        private Location minimum(Location producerAuditPosition,
614                Location lastIndoubtPosition) {
615            Location min = null;
616            if (producerAuditPosition != null) {
617                min = producerAuditPosition;
618                if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
619                    min = lastIndoubtPosition;
620                }
621            } else {
622                min = lastIndoubtPosition;
623            }
624            return min;
625        }
626    
627        private Location recoverProducerAudit() throws IOException {
628            if (metadata.producerSequenceIdTrackerLocation != null) {
629                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
630                try {
631                    ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
632                    metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
633                    return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
634                } catch (Exception e) {
635                    LOG.warn("Cannot recover message audit", e);
636                    return journal.getNextLocation(null);
637                }
638            } else {
639                // got no audit stored so got to recreate via replay from start of the journal
640                return journal.getNextLocation(null);
641            }
642        }
643    
644        protected void recoverIndex(Transaction tx) throws IOException {
645            long start = System.currentTimeMillis();
646            // It is possible index updates got applied before the journal updates..
647            // in that case we need to removed references to messages that are not in the journal
648            final Location lastAppendLocation = journal.getLastAppendLocation();
649            long undoCounter=0;
650    
651            // Go through all the destinations to see if they have messages past the lastAppendLocation
652            for (StoredDestination sd : storedDestinations.values()) {
653    
654                final ArrayList<Long> matches = new ArrayList<Long>();
655                // Find all the Locations that are >= than the last Append Location.
656                sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
657                    @Override
658                    protected void matched(Location key, Long value) {
659                        matches.add(value);
660                    }
661                });
662    
663                for (Long sequenceId : matches) {
664                    MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
665                    sd.locationIndex.remove(tx, keys.location);
666                    sd.messageIdIndex.remove(tx, keys.messageId);
667                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
668                    undoCounter++;
669                    // TODO: do we need to modify the ack positions for the pub sub case?
670                }
671            }
672    
673            if( undoCounter > 0 ) {
674                // The rolledback operations are basically in flight journal writes.  To avoid getting
675                // these the end user should do sync writes to the journal.
676                if (LOG.isInfoEnabled()) {
677                    long end = System.currentTimeMillis();
678                    LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
679                }
680            }
681    
682            undoCounter = 0;
683            start = System.currentTimeMillis();
684    
685            // Lets be extra paranoid here and verify that all the datafiles being referenced
686            // by the indexes still exists.
687    
688            final SequenceSet ss = new SequenceSet();
689            for (StoredDestination sd : storedDestinations.values()) {
690                // Use a visitor to cut down the number of pages that we load
691                sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
692                    int last=-1;
693    
694                    @Override
695                    public boolean isInterestedInKeysBetween(Location first, Location second) {
696                        if( first==null ) {
697                            return !ss.contains(0, second.getDataFileId());
698                        } else if( second==null ) {
699                            return true;
700                        } else {
701                            return !ss.contains(first.getDataFileId(), second.getDataFileId());
702                        }
703                    }
704    
705                    @Override
706                    public void visit(List<Location> keys, List<Long> values) {
707                        for (Location l : keys) {
708                            int fileId = l.getDataFileId();
709                            if( last != fileId ) {
710                                ss.add(fileId);
711                                last = fileId;
712                            }
713                        }
714                    }
715    
716                });
717            }
718            HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
719            while (!ss.isEmpty()) {
720                missingJournalFiles.add((int) ss.removeFirst());
721            }
722            missingJournalFiles.removeAll(journal.getFileMap().keySet());
723    
724            if (!missingJournalFiles.isEmpty()) {
725                if (LOG.isInfoEnabled()) {
726                    LOG.info("Some journal files are missing: " + missingJournalFiles);
727                }
728            }
729    
730            ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
731            for (Integer missing : missingJournalFiles) {
732                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
733            }
734    
735            if (checkForCorruptJournalFiles) {
736                Collection<DataFile> dataFiles = journal.getFileMap().values();
737                for (DataFile dataFile : dataFiles) {
738                    int id = dataFile.getDataFileId();
739                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
740                    Sequence seq = dataFile.getCorruptedBlocks().getHead();
741                    while (seq != null) {
742                        missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
743                        seq = seq.getNext();
744                    }
745                }
746            }
747    
748            if (!missingPredicates.isEmpty()) {
749                for (StoredDestination sd : storedDestinations.values()) {
750    
751                    final ArrayList<Long> matches = new ArrayList<Long>();
752                    sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
753                        @Override
754                        protected void matched(Location key, Long value) {
755                            matches.add(value);
756                        }
757                    });
758    
759                    // If somes message references are affected by the missing data files...
760                    if (!matches.isEmpty()) {
761    
762                        // We either 'gracefully' recover dropping the missing messages or
763                        // we error out.
764                        if( ignoreMissingJournalfiles ) {
765                            // Update the index to remove the references to the missing data
766                            for (Long sequenceId : matches) {
767                                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
768                                sd.locationIndex.remove(tx, keys.location);
769                                sd.messageIdIndex.remove(tx, keys.messageId);
770                                undoCounter++;
771                                // TODO: do we need to modify the ack positions for the pub sub case?
772                            }
773    
774                        } else {
775                            throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
776                        }
777                    }
778                }
779            }
780    
781            if( undoCounter > 0 ) {
782                // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
783                // should do sync writes to the journal.
784                if (LOG.isInfoEnabled()) {
785                    long end = System.currentTimeMillis();
786                    LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
787                }
788            }
789        }
790    
791        private Location nextRecoveryPosition;
792        private Location lastRecoveryPosition;
793    
794        public void incrementalRecover() throws IOException {
795            this.indexLock.writeLock().lock();
796            try {
797                if( nextRecoveryPosition == null ) {
798                    if( lastRecoveryPosition==null ) {
799                        nextRecoveryPosition = getRecoveryPosition();
800                    } else {
801                        nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
802                    }
803                }
804                while (nextRecoveryPosition != null) {
805                    lastRecoveryPosition = nextRecoveryPosition;
806                    metadata.lastUpdate = lastRecoveryPosition;
807                    JournalCommand<?> message = load(lastRecoveryPosition);
808                    process(message, lastRecoveryPosition, (Runnable)null);
809                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
810                }
811            } finally {
812                this.indexLock.writeLock().unlock();
813            }
814        }
815    
816        public Location getLastUpdatePosition() throws IOException {
817            return metadata.lastUpdate;
818        }
819    
820        private Location getRecoveryPosition() throws IOException {
821    
822            if (!this.forceRecoverIndex) {
823    
824                // If we need to recover the transactions..
825                if (metadata.firstInProgressTransactionLocation != null) {
826                    return metadata.firstInProgressTransactionLocation;
827                }
828    
829                // Perhaps there were no transactions...
830                if( metadata.lastUpdate!=null) {
831                    // Start replay at the record after the last one recorded in the index file.
832                    return journal.getNextLocation(metadata.lastUpdate);
833                }
834            }
835            // This loads the first position.
836            return journal.getNextLocation(null);
837        }
838    
839        protected void checkpointCleanup(final boolean cleanup) throws IOException {
840            long start;
841            this.indexLock.writeLock().lock();
842            try {
843                start = System.currentTimeMillis();
844                if( !opened.get() ) {
845                    return;
846                }
847                pageFile.tx().execute(new Transaction.Closure<IOException>() {
848                    @Override
849                    public void execute(Transaction tx) throws IOException {
850                        checkpointUpdate(tx, cleanup);
851                    }
852                });
853            } finally {
854                this.indexLock.writeLock().unlock();
855            }
856    
857            long end = System.currentTimeMillis();
858            if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
859                if (LOG.isInfoEnabled()) {
860                    LOG.info("Slow KahaDB access: cleanup took " + (end - start));
861                }
862            }
863        }
864    
865        public void checkpoint(Callback closure) throws Exception {
866            this.indexLock.writeLock().lock();
867            try {
868                pageFile.tx().execute(new Transaction.Closure<IOException>() {
869                    @Override
870                    public void execute(Transaction tx) throws IOException {
871                        checkpointUpdate(tx, false);
872                    }
873                });
874                closure.execute();
875            } finally {
876                this.indexLock.writeLock().unlock();
877            }
878        }
879    
880        public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
881            int size = data.serializedSizeFramed();
882            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
883            os.writeByte(data.type().getNumber());
884            data.writeFramed(os);
885            return os.toByteSequence();
886        }
887    
888        // /////////////////////////////////////////////////////////////////
889        // Methods call by the broker to update and query the store.
890        // /////////////////////////////////////////////////////////////////
891        public Location store(JournalCommand<?> data) throws IOException {
892            return store(data, false, null,null);
893        }
894    
895        public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
896            return store(data, false, null,null, onJournalStoreComplete);
897        }
898    
899        public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
900            return store(data, sync, before, after, null);
901        }
902    
903        /**
904         * All updated are are funneled through this method. The updates are converted
905         * to a JournalMessage which is logged to the journal and then the data from
906         * the JournalMessage is used to update the index just like it would be done
907         * during a recovery process.
908         */
909        public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException {
910            if (before != null) {
911                before.run();
912            }
913            try {
914                ByteSequence sequence = toByteSequence(data);
915                long start = System.currentTimeMillis();
916                Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) :  journal.write(sequence, onJournalStoreComplete) ;
917                long start2 = System.currentTimeMillis();
918                process(data, location, after);
919                long end = System.currentTimeMillis();
920                if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
921                    if (LOG.isInfoEnabled()) {
922                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
923                    }
924                }
925    
926                if (after != null) {
927                    Runnable afterCompletion = null;
928                    synchronized (orderedTransactionAfters) {
929                        if (!orderedTransactionAfters.empty()) {
930                            afterCompletion = orderedTransactionAfters.pop();
931                        }
932                    }
933                    if (afterCompletion != null) {
934                        afterCompletion.run();
935                    } else {
936                        // non persistent message case
937                        after.run();
938                    }
939                }
940    
941                if (checkpointThread != null && !checkpointThread.isAlive()) {
942                    startCheckpoint();
943                }
944                return location;
945            } catch (IOException ioe) {
946                LOG.error("KahaDB failed to store to Journal", ioe);
947                brokerService.handleIOException(ioe);
948                throw ioe;
949            }
950        }
951    
952        /**
953         * Loads a previously stored JournalMessage
954         *
955         * @param location
956         * @return
957         * @throws IOException
958         */
959        public JournalCommand<?> load(Location location) throws IOException {
960            long start = System.currentTimeMillis();
961            ByteSequence data = journal.read(location);
962            long end = System.currentTimeMillis();
963            if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
964                if (LOG.isInfoEnabled()) {
965                    LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
966                }
967            }
968            DataByteArrayInputStream is = new DataByteArrayInputStream(data);
969            byte readByte = is.readByte();
970            KahaEntryType type = KahaEntryType.valueOf(readByte);
971            if( type == null ) {
972                throw new IOException("Could not load journal record. Invalid location: "+location);
973            }
974            JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
975            message.mergeFramed(is);
976            return message;
977        }
978    
979        /**
980         * do minimal recovery till we reach the last inDoubtLocation
981         * @param data
982         * @param location
983         * @param inDoubtlocation
984         * @throws IOException
985         */
986        void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
987            if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
988                process(data, location, (Runnable) null);
989            } else {
990                // just recover producer audit
991                data.visit(new Visitor() {
992                    @Override
993                    public void visit(KahaAddMessageCommand command) throws IOException {
994                        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
995                    }
996                });
997            }
998        }
999    
1000        // /////////////////////////////////////////////////////////////////
1001        // Journaled record processing methods. Once the record is journaled,
1002        // these methods handle applying the index updates. These may be called
1003        // from the recovery method too so they need to be idempotent
1004        // /////////////////////////////////////////////////////////////////
1005    
1006        void process(JournalCommand<?> data, final Location location, final Runnable after) throws IOException {
1007            data.visit(new Visitor() {
1008                @Override
1009                public void visit(KahaAddMessageCommand command) throws IOException {
1010                    process(command, location);
1011                }
1012    
1013                @Override
1014                public void visit(KahaRemoveMessageCommand command) throws IOException {
1015                    process(command, location);
1016                }
1017    
1018                @Override
1019                public void visit(KahaPrepareCommand command) throws IOException {
1020                    process(command, location);
1021                }
1022    
1023                @Override
1024                public void visit(KahaCommitCommand command) throws IOException {
1025                    process(command, location, after);
1026                }
1027    
1028                @Override
1029                public void visit(KahaRollbackCommand command) throws IOException {
1030                    process(command, location);
1031                }
1032    
1033                @Override
1034                public void visit(KahaRemoveDestinationCommand command) throws IOException {
1035                    process(command, location);
1036                }
1037    
1038                @Override
1039                public void visit(KahaSubscriptionCommand command) throws IOException {
1040                    process(command, location);
1041                }
1042    
1043                @Override
1044                public void visit(KahaProducerAuditCommand command) throws IOException {
1045                    processLocation(location);
1046                }
1047    
1048                @Override
1049                public void visit(KahaTraceCommand command) {
1050                    processLocation(location);
1051                }
1052            });
1053        }
1054    
1055        @SuppressWarnings("rawtypes")
1056        protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
1057            if (command.hasTransactionInfo()) {
1058                List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
1059                inflightTx.add(new AddOpperation(command, location));
1060            } else {
1061                this.indexLock.writeLock().lock();
1062                try {
1063                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1064                        @Override
1065                        public void execute(Transaction tx) throws IOException {
1066                            upadateIndex(tx, command, location);
1067                        }
1068                    });
1069                } finally {
1070                    this.indexLock.writeLock().unlock();
1071                }
1072            }
1073        }
1074    
1075        @SuppressWarnings("rawtypes")
1076        protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1077            if (command.hasTransactionInfo()) {
1078               List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
1079               inflightTx.add(new RemoveOpperation(command, location));
1080            } else {
1081                this.indexLock.writeLock().lock();
1082                try {
1083                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1084                        @Override
1085                        public void execute(Transaction tx) throws IOException {
1086                            updateIndex(tx, command, location);
1087                        }
1088                    });
1089                } finally {
1090                    this.indexLock.writeLock().unlock();
1091                }
1092            }
1093        }
1094    
1095        protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1096            this.indexLock.writeLock().lock();
1097            try {
1098                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1099                    @Override
1100                    public void execute(Transaction tx) throws IOException {
1101                        updateIndex(tx, command, location);
1102                    }
1103                });
1104            } finally {
1105                this.indexLock.writeLock().unlock();
1106            }
1107        }
1108    
1109        protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1110            this.indexLock.writeLock().lock();
1111            try {
1112                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1113                    @Override
1114                    public void execute(Transaction tx) throws IOException {
1115                        updateIndex(tx, command, location);
1116                    }
1117                });
1118            } finally {
1119                this.indexLock.writeLock().unlock();
1120            }
1121        }
1122    
1123        protected void processLocation(final Location location) {
1124            this.indexLock.writeLock().lock();
1125            try {
1126                metadata.lastUpdate = location;
1127            } finally {
1128                this.indexLock.writeLock().unlock();
1129            }
1130        }
1131    
1132        private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>();
1133        private void push(Runnable after) {
1134            if (after != null) {
1135                synchronized (orderedTransactionAfters) {
1136                    orderedTransactionAfters.push(after);
1137                }
1138            }
1139        }
1140    
1141        @SuppressWarnings("rawtypes")
1142        protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
1143            TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1144            List<Operation> inflightTx;
1145            synchronized (inflightTransactions) {
1146                inflightTx = inflightTransactions.remove(key);
1147                if (inflightTx == null) {
1148                    inflightTx = preparedTransactions.remove(key);
1149                }
1150            }
1151            if (inflightTx == null) {
1152                if (after != null) {
1153                    // since we don't push this after and we may find another, lets run it now
1154                    after.run();
1155                }
1156                return;
1157            }
1158    
1159            final List<Operation> messagingTx = inflightTx;
1160            this.indexLock.writeLock().lock();
1161            try {
1162                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1163                    @Override
1164                    public void execute(Transaction tx) throws IOException {
1165                        for (Operation op : messagingTx) {
1166                            op.execute(tx);
1167                        }
1168                    }
1169                });
1170                metadata.lastUpdate = location;
1171                push(after);
1172            } finally {
1173                this.indexLock.writeLock().unlock();
1174            }
1175        }
1176    
1177        @SuppressWarnings("rawtypes")
1178        protected void process(KahaPrepareCommand command, Location location) {
1179            TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1180            synchronized (inflightTransactions) {
1181                List<Operation> tx = inflightTransactions.remove(key);
1182                if (tx != null) {
1183                    preparedTransactions.put(key, tx);
1184                }
1185            }
1186        }
1187    
1188        @SuppressWarnings("rawtypes")
1189        protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1190            TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1191            List<Operation> updates = null;
1192            synchronized (inflightTransactions) {
1193                updates = inflightTransactions.remove(key);
1194                if (updates == null) {
1195                    updates = preparedTransactions.remove(key);
1196                }
1197            }
1198            if (isRewriteOnRedelivery()) {
1199                persistRedeliveryCount(updates);
1200            }
1201        }
1202    
1203        @SuppressWarnings("rawtypes")
1204        private void persistRedeliveryCount(List<Operation> updates)  throws IOException {
1205            if (updates != null) {
1206                for (Operation operation : updates) {
1207                    operation.getCommand().visit(new Visitor() {
1208                        @Override
1209                        public void visit(KahaRemoveMessageCommand command) throws IOException {
1210                            incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination());
1211                        }
1212                    });
1213                }
1214            }
1215        }
1216    
1217       abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException;
1218    
1219        // /////////////////////////////////////////////////////////////////
1220        // These methods do the actual index updates.
1221        // /////////////////////////////////////////////////////////////////
1222    
1223        protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1224        private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1225    
1226        void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1227            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1228    
1229            // Skip adding the message to the index if this is a topic and there are
1230            // no subscriptions.
1231            if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1232                return;
1233            }
1234    
1235            // Add the message.
1236            int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1237            long id = sd.orderIndex.getNextMessageId(priority);
1238            Long previous = sd.locationIndex.put(tx, location, id);
1239            if (previous == null) {
1240                previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1241                if (previous == null) {
1242                    sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1243                    if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1244                        addAckLocationForNewMessage(tx, sd, id);
1245                    }
1246                } else {
1247                    // If the message ID as indexed, then the broker asked us to
1248                    // store a DUP
1249                    // message. Bad BOY! Don't do it, and log a warning.
1250                    LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
1251                    sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1252                    sd.locationIndex.remove(tx, location);
1253                    rollbackStatsOnDuplicate(command.getDestination());
1254                }
1255            } else {
1256                // restore the previous value.. Looks like this was a redo of a
1257                // previously
1258                // added message. We don't want to assign it a new id as the other
1259                // indexes would
1260                // be wrong..
1261                //
1262                sd.locationIndex.put(tx, location, previous);
1263            }
1264            // record this id in any event, initial send or recovery
1265            metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1266            metadata.lastUpdate = location;
1267        }
1268    
1269        abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
1270    
1271        void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1272            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1273            if (!command.hasSubscriptionKey()) {
1274    
1275                // In the queue case we just remove the message from the index..
1276                Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1277                if (sequenceId != null) {
1278                    MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1279                    if (keys != null) {
1280                        sd.locationIndex.remove(tx, keys.location);
1281                        recordAckMessageReferenceLocation(ackLocation, keys.location);
1282                    }  else if (LOG.isDebugEnabled()) {
1283                        LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1284                    }
1285                } else if (LOG.isDebugEnabled()) {
1286                    LOG.debug("message not found in sequence id index: " + command.getMessageId());
1287                }
1288            } else {
1289                // In the topic case we need remove the message once it's been acked
1290                // by all the subs
1291                Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1292    
1293                // Make sure it's a valid message id...
1294                if (sequence != null) {
1295                    String subscriptionKey = command.getSubscriptionKey();
1296                    if (command.getAck() != UNMATCHED) {
1297                        sd.orderIndex.get(tx, sequence);
1298                        byte priority = sd.orderIndex.lastGetPriority();
1299                        sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1300                    }
1301                    // The following method handles deleting un-referenced messages.
1302                    removeAckLocation(tx, sd, subscriptionKey, sequence);
1303                } else if (LOG.isDebugEnabled()) {
1304                    LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1305                }
1306    
1307            }
1308            metadata.lastUpdate = ackLocation;
1309        }
1310    
1311        Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
1312        private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1313            Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1314            if (referenceFileIds == null) {
1315                referenceFileIds = new HashSet<Integer>();
1316                referenceFileIds.add(messageLocation.getDataFileId());
1317                ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1318            } else {
1319                Integer id = Integer.valueOf(messageLocation.getDataFileId());
1320                if (!referenceFileIds.contains(id)) {
1321                    referenceFileIds.add(id);
1322                }
1323            }
1324        }
1325    
1326        void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1327            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1328            sd.orderIndex.remove(tx);
1329    
1330            sd.locationIndex.clear(tx);
1331            sd.locationIndex.unload(tx);
1332            tx.free(sd.locationIndex.getPageId());
1333    
1334            sd.messageIdIndex.clear(tx);
1335            sd.messageIdIndex.unload(tx);
1336            tx.free(sd.messageIdIndex.getPageId());
1337    
1338            if (sd.subscriptions != null) {
1339                sd.subscriptions.clear(tx);
1340                sd.subscriptions.unload(tx);
1341                tx.free(sd.subscriptions.getPageId());
1342    
1343                sd.subscriptionAcks.clear(tx);
1344                sd.subscriptionAcks.unload(tx);
1345                tx.free(sd.subscriptionAcks.getPageId());
1346    
1347                sd.ackPositions.clear(tx);
1348                sd.ackPositions.unload(tx);
1349                tx.free(sd.ackPositions.getHeadPageId());
1350            }
1351    
1352            String key = key(command.getDestination());
1353            storedDestinations.remove(key);
1354            metadata.destinations.remove(tx, key);
1355        }
1356    
1357        void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1358            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1359            final String subscriptionKey = command.getSubscriptionKey();
1360    
1361            // If set then we are creating it.. otherwise we are destroying the sub
1362            if (command.hasSubscriptionInfo()) {
1363                sd.subscriptions.put(tx, subscriptionKey, command);
1364                long ackLocation=NOT_ACKED;
1365                if (!command.getRetroactive()) {
1366                    ackLocation = sd.orderIndex.nextMessageId-1;
1367                } else {
1368                    addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1369                }
1370                sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1371                sd.subscriptionCache.add(subscriptionKey);
1372            } else {
1373                // delete the sub...
1374                sd.subscriptions.remove(tx, subscriptionKey);
1375                sd.subscriptionAcks.remove(tx, subscriptionKey);
1376                sd.subscriptionCache.remove(subscriptionKey);
1377                removeAckLocationsForSub(tx, sd, subscriptionKey);
1378    
1379                if (sd.subscriptions.isEmpty(tx)) {
1380                    sd.messageIdIndex.clear(tx);
1381                    sd.locationIndex.clear(tx);
1382                    sd.orderIndex.clear(tx);
1383                }
1384            }
1385        }
1386    
1387        /**
1388         * @param tx
1389         * @throws IOException
1390         */
1391        void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1392            LOG.debug("Checkpoint started.");
1393    
1394            // reflect last update exclusive of current checkpoint
1395            Location lastUpdate = metadata.lastUpdate;
1396    
1397            metadata.state = OPEN_STATE;
1398            metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1399            Location[] inProgressTxRange = getInProgressTxLocationRange();
1400            metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1401            tx.store(metadata.page, metadataMarshaller, true);
1402            pageFile.flush();
1403    
1404            if( cleanup ) {
1405    
1406                final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1407                final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1408    
1409                if (LOG.isTraceEnabled()) {
1410                    LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1411                }
1412    
1413                if (lastUpdate != null) {
1414                    gcCandidateSet.remove(lastUpdate.getDataFileId());
1415                }
1416    
1417                // Don't GC files under replication
1418                if( journalFilesBeingReplicated!=null ) {
1419                    gcCandidateSet.removeAll(journalFilesBeingReplicated);
1420                }
1421    
1422                if (metadata.producerSequenceIdTrackerLocation != null) {
1423                    gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId());
1424                }
1425    
1426                // Don't GC files referenced by in-progress tx
1427                if (inProgressTxRange[0] != null) {
1428                    for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1429                        gcCandidateSet.remove(pendingTx);
1430                    }
1431                }
1432                if (LOG.isTraceEnabled()) {
1433                    LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1434                }
1435    
1436                // Go through all the destinations to see if any of them can remove GC candidates.
1437                for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1438                    if( gcCandidateSet.isEmpty() ) {
1439                        break;
1440                    }
1441    
1442                    // Use a visitor to cut down the number of pages that we load
1443                    entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1444                        int last=-1;
1445                        @Override
1446                        public boolean isInterestedInKeysBetween(Location first, Location second) {
1447                            if( first==null ) {
1448                                SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1449                                if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1450                                    subset.remove(second.getDataFileId());
1451                                }
1452                                return !subset.isEmpty();
1453                            } else if( second==null ) {
1454                                SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1455                                if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1456                                    subset.remove(first.getDataFileId());
1457                                }
1458                                return !subset.isEmpty();
1459                            } else {
1460                                SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1461                                if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1462                                    subset.remove(first.getDataFileId());
1463                                }
1464                                if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1465                                    subset.remove(second.getDataFileId());
1466                                }
1467                                return !subset.isEmpty();
1468                            }
1469                        }
1470    
1471                        @Override
1472                        public void visit(List<Location> keys, List<Long> values) {
1473                            for (Location l : keys) {
1474                                int fileId = l.getDataFileId();
1475                                if( last != fileId ) {
1476                                    gcCandidateSet.remove(fileId);
1477                                    last = fileId;
1478                                }
1479                            }
1480                        }
1481                    });
1482                    if (LOG.isTraceEnabled()) {
1483                        LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1484                    }
1485                }
1486    
1487                // check we are not deleting file with ack for in-use journal files
1488                if (LOG.isTraceEnabled()) {
1489                    LOG.trace("gc candidates: " + gcCandidateSet);
1490                }
1491                final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
1492                Iterator<Integer> candidates = gcCandidateSet.iterator();
1493                while (candidates.hasNext()) {
1494                    Integer candidate = candidates.next();
1495                    Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
1496                    if (referencedFileIds != null) {
1497                        for (Integer referencedFileId : referencedFileIds) {
1498                            if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
1499                                // active file that is not targeted for deletion is referenced so don't delete
1500                                candidates.remove();
1501                                break;
1502                            }
1503                        }
1504                        if (gcCandidateSet.contains(candidate)) {
1505                            ackMessageFileMap.remove(candidate);
1506                        } else {
1507                            if (LOG.isTraceEnabled()) {
1508                                LOG.trace("not removing data file: " + candidate
1509                                        + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1510                            }
1511                        }
1512                    }
1513                }
1514    
1515                if (!gcCandidateSet.isEmpty()) {
1516                    if (LOG.isDebugEnabled()) {
1517                        LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
1518                    }
1519                    journal.removeDataFiles(gcCandidateSet);
1520                }
1521            }
1522    
1523            LOG.debug("Checkpoint done.");
1524        }
1525    
1526        final Runnable nullCompletionCallback = new Runnable() {
1527            @Override
1528            public void run() {
1529            }
1530        };
1531        private Location checkpointProducerAudit() throws IOException {
1532            if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
1533                ByteArrayOutputStream baos = new ByteArrayOutputStream();
1534                ObjectOutputStream oout = new ObjectOutputStream(baos);
1535                oout.writeObject(metadata.producerSequenceIdTracker);
1536                oout.flush();
1537                oout.close();
1538                // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1539                Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
1540                try {
1541                    location.getLatch().await();
1542                } catch (InterruptedException e) {
1543                    throw new InterruptedIOException(e.toString());
1544                }
1545                return location;
1546            }
1547            return metadata.producerSequenceIdTrackerLocation;
1548        }
1549    
1550        public HashSet<Integer> getJournalFilesBeingReplicated() {
1551            return journalFilesBeingReplicated;
1552        }
1553    
1554        // /////////////////////////////////////////////////////////////////
1555        // StoredDestination related implementation methods.
1556        // /////////////////////////////////////////////////////////////////
1557    
1558        private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1559    
1560        class StoredSubscription {
1561            SubscriptionInfo subscriptionInfo;
1562            String lastAckId;
1563            Location lastAckLocation;
1564            Location cursor;
1565        }
1566    
1567        static class MessageKeys {
1568            final String messageId;
1569            final Location location;
1570    
1571            public MessageKeys(String messageId, Location location) {
1572                this.messageId=messageId;
1573                this.location=location;
1574            }
1575    
1576            @Override
1577            public String toString() {
1578                return "["+messageId+","+location+"]";
1579            }
1580        }
1581    
1582        static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1583            static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1584    
1585            @Override
1586            public MessageKeys readPayload(DataInput dataIn) throws IOException {
1587                return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1588            }
1589    
1590            @Override
1591            public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1592                dataOut.writeUTF(object.messageId);
1593                LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1594            }
1595        }
1596    
1597        class LastAck {
1598            long lastAckedSequence;
1599            byte priority;
1600    
1601            public LastAck(LastAck source) {
1602                this.lastAckedSequence = source.lastAckedSequence;
1603                this.priority = source.priority;
1604            }
1605    
1606            public LastAck() {
1607                this.priority = MessageOrderIndex.HI;
1608            }
1609    
1610            public LastAck(long ackLocation) {
1611                this.lastAckedSequence = ackLocation;
1612                this.priority = MessageOrderIndex.LO;
1613            }
1614    
1615            public LastAck(long ackLocation, byte priority) {
1616                this.lastAckedSequence = ackLocation;
1617                this.priority = priority;
1618            }
1619    
1620            @Override
1621            public String toString() {
1622                return "[" + lastAckedSequence + ":" + priority + "]";
1623            }
1624        }
1625    
1626        protected class LastAckMarshaller implements Marshaller<LastAck> {
1627    
1628            @Override
1629            public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1630                dataOut.writeLong(object.lastAckedSequence);
1631                dataOut.writeByte(object.priority);
1632            }
1633    
1634            @Override
1635            public LastAck readPayload(DataInput dataIn) throws IOException {
1636                LastAck lastAcked = new LastAck();
1637                lastAcked.lastAckedSequence = dataIn.readLong();
1638                if (metadata.version >= 3) {
1639                    lastAcked.priority = dataIn.readByte();
1640                }
1641                return lastAcked;
1642            }
1643    
1644            @Override
1645            public int getFixedSize() {
1646                return 9;
1647            }
1648    
1649            @Override
1650            public LastAck deepCopy(LastAck source) {
1651                return new LastAck(source);
1652            }
1653    
1654            @Override
1655            public boolean isDeepCopySupported() {
1656                return true;
1657            }
1658        }
1659    
1660        class StoredDestination {
1661    
1662            MessageOrderIndex orderIndex = new MessageOrderIndex();
1663            BTreeIndex<Location, Long> locationIndex;
1664            BTreeIndex<String, Long> messageIdIndex;
1665    
1666            // These bits are only set for Topics
1667            BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1668            BTreeIndex<String, LastAck> subscriptionAcks;
1669            HashMap<String, MessageOrderCursor> subscriptionCursors;
1670            ListIndex<String, SequenceSet> ackPositions;
1671    
1672            // Transient data used to track which Messages are no longer needed.
1673            final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
1674            final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
1675        }
1676    
1677        protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1678    
1679            @Override
1680            public StoredDestination readPayload(final DataInput dataIn) throws IOException {
1681                final StoredDestination value = new StoredDestination();
1682                value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1683                value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1684                value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1685    
1686                if (dataIn.readBoolean()) {
1687                    value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1688                    value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1689                    if (metadata.version >= 4) {
1690                        value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
1691                    } else {
1692                        // upgrade
1693                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
1694                            @Override
1695                            public void execute(Transaction tx) throws IOException {
1696                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
1697                                    new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1698                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1699                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1700                                oldAckPositions.load(tx);
1701    
1702                                LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
1703    
1704                                // Do the initial build of the data in memory before writing into the store
1705                                // based Ack Positions List to avoid a lot of disk thrashing.
1706                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
1707                                while (iterator.hasNext()) {
1708                                    Entry<Long, HashSet<String>> entry = iterator.next();
1709    
1710                                    for(String subKey : entry.getValue()) {
1711                                        SequenceSet pendingAcks = temp.get(subKey);
1712                                        if (pendingAcks == null) {
1713                                            pendingAcks = new SequenceSet();
1714                                            temp.put(subKey, pendingAcks);
1715                                        }
1716    
1717                                        pendingAcks.add(entry.getKey());
1718                                    }
1719                                }
1720    
1721                                // Now move the pending messages to ack data into the store backed
1722                                // structure.
1723                                value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1724                                for(String subscriptionKey : temp.keySet()) {
1725                                    value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
1726                                }
1727    
1728                            }
1729                        });
1730                    }
1731                }
1732                if (metadata.version >= 2) {
1733                    value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1734                    value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1735                } else {
1736                        // upgrade
1737                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
1738                            @Override
1739                            public void execute(Transaction tx) throws IOException {
1740                                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1741                                value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1742                                value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1743                                value.orderIndex.lowPriorityIndex.load(tx);
1744    
1745                                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1746                                value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1747                                value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1748                                value.orderIndex.highPriorityIndex.load(tx);
1749                            }
1750                        });
1751                }
1752    
1753                return value;
1754            }
1755    
1756            @Override
1757            public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1758                dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
1759                dataOut.writeLong(value.locationIndex.getPageId());
1760                dataOut.writeLong(value.messageIdIndex.getPageId());
1761                if (value.subscriptions != null) {
1762                    dataOut.writeBoolean(true);
1763                    dataOut.writeLong(value.subscriptions.getPageId());
1764                    dataOut.writeLong(value.subscriptionAcks.getPageId());
1765                    dataOut.writeLong(value.ackPositions.getHeadPageId());
1766                } else {
1767                    dataOut.writeBoolean(false);
1768                }
1769                dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
1770                dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
1771            }
1772        }
1773    
1774        static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1775            final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1776    
1777            @Override
1778            public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1779                KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1780                rc.mergeFramed((InputStream)dataIn);
1781                return rc;
1782            }
1783    
1784            @Override
1785            public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1786                object.writeFramed((OutputStream)dataOut);
1787            }
1788        }
1789    
1790        protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1791            String key = key(destination);
1792            StoredDestination rc = storedDestinations.get(key);
1793            if (rc == null) {
1794                boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1795                rc = loadStoredDestination(tx, key, topic);
1796                // Cache it. We may want to remove/unload destinations from the
1797                // cache that are not used for a while
1798                // to reduce memory usage.
1799                storedDestinations.put(key, rc);
1800            }
1801            return rc;
1802        }
1803    
1804        protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1805            String key = key(destination);
1806            StoredDestination rc = storedDestinations.get(key);
1807            if (rc == null && metadata.destinations.containsKey(tx, key)) {
1808                rc = getStoredDestination(destination, tx);
1809            }
1810            return rc;
1811        }
1812    
1813        /**
1814         * @param tx
1815         * @param key
1816         * @param topic
1817         * @return
1818         * @throws IOException
1819         */
1820        private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1821            // Try to load the existing indexes..
1822            StoredDestination rc = metadata.destinations.get(tx, key);
1823            if (rc == null) {
1824                // Brand new destination.. allocate indexes for it.
1825                rc = new StoredDestination();
1826                rc.orderIndex.allocate(tx);
1827                rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1828                rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1829    
1830                if (topic) {
1831                    rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1832                    rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
1833                    rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1834                }
1835                metadata.destinations.put(tx, key, rc);
1836            }
1837    
1838            // Configure the marshalers and load.
1839            rc.orderIndex.load(tx);
1840    
1841            // Figure out the next key using the last entry in the destination.
1842            rc.orderIndex.configureLast(tx);
1843    
1844            rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
1845            rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1846            rc.locationIndex.load(tx);
1847    
1848            rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1849            rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1850            rc.messageIdIndex.load(tx);
1851    
1852            // If it was a topic...
1853            if (topic) {
1854    
1855                rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1856                rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1857                rc.subscriptions.load(tx);
1858    
1859                rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1860                rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
1861                rc.subscriptionAcks.load(tx);
1862    
1863                rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
1864                rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
1865                rc.ackPositions.load(tx);
1866    
1867                rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
1868    
1869                if (metadata.version < 3) {
1870    
1871                    // on upgrade need to fill ackLocation with available messages past last ack
1872                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1873                        Entry<String, LastAck> entry = iterator.next();
1874                        for (Iterator<Entry<Long, MessageKeys>> orderIterator =
1875                                rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
1876                            Long sequence = orderIterator.next().getKey();
1877                            addAckLocation(tx, rc, sequence, entry.getKey());
1878                        }
1879                        // modify so it is upgraded
1880                        rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
1881                    }
1882                }
1883    
1884                // Configure the message references index
1885                Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
1886                while (subscriptions.hasNext()) {
1887                    Entry<String, SequenceSet> subscription = subscriptions.next();
1888                    SequenceSet pendingAcks = subscription.getValue();
1889                    if (pendingAcks != null && !pendingAcks.isEmpty()) {
1890                        Long lastPendingAck = pendingAcks.getTail().getLast();
1891                        for(Long sequenceId : pendingAcks) {
1892                            Long current = rc.messageReferences.get(sequenceId);
1893                            if (current == null) {
1894                                current = new Long(0);
1895                            }
1896    
1897                            // We always add a trailing empty entry for the next position to start from
1898                            // so we need to ensure we don't count that as a message reference on reload.
1899                            if (!sequenceId.equals(lastPendingAck)) {
1900                                current = current.longValue() + 1;
1901                            }
1902    
1903                            rc.messageReferences.put(sequenceId, current);
1904                        }
1905                    }
1906                }
1907    
1908                // Configure the subscription cache
1909                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1910                    Entry<String, LastAck> entry = iterator.next();
1911                    rc.subscriptionCache.add(entry.getKey());
1912                }
1913    
1914                if (rc.orderIndex.nextMessageId == 0) {
1915                    // check for existing durable sub all acked out - pull next seq from acks as messages are gone
1916                    if (!rc.subscriptionAcks.isEmpty(tx)) {
1917                        for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1918                            Entry<String, LastAck> entry = iterator.next();
1919                            rc.orderIndex.nextMessageId =
1920                                    Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
1921                        }
1922                    }
1923                } else {
1924                    // update based on ackPositions for unmatched, last entry is always the next
1925                    if (!rc.messageReferences.isEmpty()) {
1926                        Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
1927                        rc.orderIndex.nextMessageId =
1928                                Math.max(rc.orderIndex.nextMessageId, nextMessageId);
1929                    }
1930                }
1931            }
1932    
1933            if (metadata.version < VERSION) {
1934                // store again after upgrade
1935                metadata.destinations.put(tx, key, rc);
1936            }
1937            return rc;
1938        }
1939    
1940        private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1941            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1942            if (sequences == null) {
1943                sequences = new SequenceSet();
1944                sequences.add(messageSequence);
1945                sd.ackPositions.add(tx, subscriptionKey, sequences);
1946            } else {
1947                sequences.add(messageSequence);
1948                sd.ackPositions.put(tx, subscriptionKey, sequences);
1949            }
1950    
1951            Long count = sd.messageReferences.get(messageSequence);
1952            if (count == null) {
1953                count = Long.valueOf(0L);
1954            }
1955            count = count.longValue() + 1;
1956            sd.messageReferences.put(messageSequence, count);
1957        }
1958    
1959        // new sub is interested in potentially all existing messages
1960        private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1961            SequenceSet allOutstanding = new SequenceSet();
1962            Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
1963            while (iterator.hasNext()) {
1964                SequenceSet set = iterator.next().getValue();
1965                for (Long entry : set) {
1966                    allOutstanding.add(entry);
1967                }
1968            }
1969            sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
1970    
1971            for (Long ackPosition : allOutstanding) {
1972                Long count = sd.messageReferences.get(ackPosition);
1973                count = count.longValue() + 1;
1974                sd.messageReferences.put(ackPosition, count);
1975            }
1976        }
1977    
1978        // on a new message add, all existing subs are interested in this message
1979        private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
1980            for(String subscriptionKey : sd.subscriptionCache) {
1981                SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1982                if (sequences == null) {
1983                    sequences = new SequenceSet();
1984                    sequences.add(new Sequence(messageSequence, messageSequence + 1));
1985                    sd.ackPositions.add(tx, subscriptionKey, sequences);
1986                } else {
1987                    sequences.add(new Sequence(messageSequence, messageSequence + 1));
1988                    sd.ackPositions.put(tx, subscriptionKey, sequences);
1989                }
1990    
1991                Long count = sd.messageReferences.get(messageSequence);
1992                if (count == null) {
1993                    count = Long.valueOf(0L);
1994                }
1995                count = count.longValue() + 1;
1996                sd.messageReferences.put(messageSequence, count);
1997                sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
1998            }
1999        }
2000    
2001        private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2002            if (!sd.ackPositions.isEmpty(tx)) {
2003                SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2004                if (sequences == null || sequences.isEmpty()) {
2005                    return;
2006                }
2007    
2008                ArrayList<Long> unreferenced = new ArrayList<Long>();
2009    
2010                for(Long sequenceId : sequences) {
2011                    Long references = sd.messageReferences.get(sequenceId);
2012                    if (references != null) {
2013                        references = references.longValue() - 1;
2014    
2015                        if (references.longValue() > 0) {
2016                            sd.messageReferences.put(sequenceId, references);
2017                        } else {
2018                            sd.messageReferences.remove(sequenceId);
2019                            unreferenced.add(sequenceId);
2020                        }
2021                    }
2022                }
2023    
2024                for(Long sequenceId : unreferenced) {
2025                    // Find all the entries that need to get deleted.
2026                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2027                    sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2028    
2029                    // Do the actual deletes.
2030                    for (Entry<Long, MessageKeys> entry : deletes) {
2031                        sd.locationIndex.remove(tx, entry.getValue().location);
2032                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2033                        sd.orderIndex.remove(tx, entry.getKey());
2034                    }
2035                }
2036            }
2037        }
2038    
2039        /**
2040         * @param tx
2041         * @param sd
2042         * @param subscriptionKey
2043         * @param messageSequence
2044         * @throws IOException
2045         */
2046        private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
2047            // Remove the sub from the previous location set..
2048            if (messageSequence != null) {
2049                SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2050                if (range != null && !range.isEmpty()) {
2051                    range.remove(messageSequence);
2052                    if (!range.isEmpty()) {
2053                        sd.ackPositions.put(tx, subscriptionKey, range);
2054                    } else {
2055                        sd.ackPositions.remove(tx, subscriptionKey);
2056                    }
2057    
2058                    // Check if the message is reference by any other subscription.
2059                    Long count = sd.messageReferences.get(messageSequence);
2060                    if (count != null){
2061                    long references = count.longValue() - 1;
2062                        if (references > 0) {
2063                            sd.messageReferences.put(messageSequence, Long.valueOf(references));
2064                            return;
2065                        } else {
2066                            sd.messageReferences.remove(messageSequence);
2067                        }
2068                    }
2069    
2070                    // Find all the entries that need to get deleted.
2071                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2072                    sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2073    
2074                    // Do the actual deletes.
2075                    for (Entry<Long, MessageKeys> entry : deletes) {
2076                        sd.locationIndex.remove(tx, entry.getValue().location);
2077                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2078                        sd.orderIndex.remove(tx, entry.getKey());
2079                    }
2080                }
2081            }
2082        }
2083    
2084        public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2085            return sd.subscriptionAcks.get(tx, subscriptionKey);
2086        }
2087    
2088        public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2089            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2090            if (messageSequences != null) {
2091                long result = messageSequences.rangeSize();
2092                // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2093                return result > 0 ? result - 1 : 0;
2094            }
2095    
2096            return 0;
2097        }
2098    
2099        private String key(KahaDestination destination) {
2100            return destination.getType().getNumber() + ":" + destination.getName();
2101        }
2102    
2103        // /////////////////////////////////////////////////////////////////
2104        // Transaction related implementation methods.
2105        // /////////////////////////////////////////////////////////////////
2106        @SuppressWarnings("rawtypes")
2107        private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2108        @SuppressWarnings("rawtypes")
2109        protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2110        protected final Set<String> ackedAndPrepared = new HashSet<String>();
2111    
2112        // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2113        // till then they are skipped by the store.
2114        // 'at most once' XA guarantee
2115        public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2116            this.indexLock.writeLock().lock();
2117            try {
2118                for (MessageAck ack : acks) {
2119                    ackedAndPrepared.add(ack.getLastMessageId().toString());
2120                }
2121            } finally {
2122                this.indexLock.writeLock().unlock();
2123            }
2124        }
2125    
2126        public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
2127            if (acks != null) {
2128                this.indexLock.writeLock().lock();
2129                try {
2130                    for (MessageAck ack : acks) {
2131                        ackedAndPrepared.remove(ack.getLastMessageId().toString());
2132                    }
2133                } finally {
2134                    this.indexLock.writeLock().unlock();
2135                }
2136            }
2137        }
2138    
2139        @SuppressWarnings("rawtypes")
2140        private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
2141            TransactionId key = TransactionIdConversion.convert(info);
2142            List<Operation> tx;
2143            synchronized (inflightTransactions) {
2144                tx = inflightTransactions.get(key);
2145                if (tx == null) {
2146                    tx = Collections.synchronizedList(new ArrayList<Operation>());
2147                    inflightTransactions.put(key, tx);
2148                }
2149            }
2150            return tx;
2151        }
2152    
2153        @SuppressWarnings("unused")
2154        private TransactionId key(KahaTransactionInfo transactionInfo) {
2155            return TransactionIdConversion.convert(transactionInfo);
2156        }
2157    
2158        abstract class Operation <T extends JournalCommand<T>> {
2159            final T command;
2160            final Location location;
2161    
2162            public Operation(T command, Location location) {
2163                this.command = command;
2164                this.location = location;
2165            }
2166    
2167            public Location getLocation() {
2168                return location;
2169            }
2170    
2171            public T getCommand() {
2172                return command;
2173            }
2174    
2175            abstract public void execute(Transaction tx) throws IOException;
2176        }
2177    
2178        class AddOpperation extends Operation<KahaAddMessageCommand> {
2179    
2180            public AddOpperation(KahaAddMessageCommand command, Location location) {
2181                super(command, location);
2182            }
2183    
2184            @Override
2185            public void execute(Transaction tx) throws IOException {
2186                upadateIndex(tx, command, location);
2187            }
2188    
2189        }
2190    
2191        class RemoveOpperation extends Operation<KahaRemoveMessageCommand> {
2192    
2193            public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
2194                super(command, location);
2195            }
2196    
2197            @Override
2198            public void execute(Transaction tx) throws IOException {
2199                updateIndex(tx, command, location);
2200            }
2201        }
2202    
2203        // /////////////////////////////////////////////////////////////////
2204        // Initialization related implementation methods.
2205        // /////////////////////////////////////////////////////////////////
2206    
2207        private PageFile createPageFile() {
2208            PageFile index = new PageFile(directory, "db");
2209            index.setEnableWriteThread(isEnableIndexWriteAsync());
2210            index.setWriteBatchSize(getIndexWriteBatchSize());
2211            index.setPageCacheSize(indexCacheSize);
2212            index.setUseLFRUEviction(isUseIndexLFRUEviction());
2213            index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2214            index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2215            index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2216            index.setEnablePageCaching(isEnableIndexPageCaching());
2217            return index;
2218        }
2219    
2220        private Journal createJournal() throws IOException {
2221            Journal manager = new Journal();
2222            manager.setDirectory(directory);
2223            manager.setMaxFileLength(getJournalMaxFileLength());
2224            manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2225            manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2226            manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2227            manager.setArchiveDataLogs(isArchiveDataLogs());
2228            manager.setSizeAccumulator(journalSize);
2229            manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2230            if (getDirectoryArchive() != null) {
2231                IOHelper.mkdirs(getDirectoryArchive());
2232                manager.setDirectoryArchive(getDirectoryArchive());
2233            }
2234            return manager;
2235        }
2236    
2237        public int getJournalMaxWriteBatchSize() {
2238            return journalMaxWriteBatchSize;
2239        }
2240    
2241        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2242            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2243        }
2244    
2245        public File getDirectory() {
2246            return directory;
2247        }
2248    
2249        public void setDirectory(File directory) {
2250            this.directory = directory;
2251        }
2252    
2253        public boolean isDeleteAllMessages() {
2254            return deleteAllMessages;
2255        }
2256    
2257        public void setDeleteAllMessages(boolean deleteAllMessages) {
2258            this.deleteAllMessages = deleteAllMessages;
2259        }
2260    
2261        public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2262            this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2263        }
2264    
2265        public int getIndexWriteBatchSize() {
2266            return setIndexWriteBatchSize;
2267        }
2268    
2269        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2270            this.enableIndexWriteAsync = enableIndexWriteAsync;
2271        }
2272    
2273        boolean isEnableIndexWriteAsync() {
2274            return enableIndexWriteAsync;
2275        }
2276    
2277        public boolean isEnableJournalDiskSyncs() {
2278            return enableJournalDiskSyncs;
2279        }
2280    
2281        public void setEnableJournalDiskSyncs(boolean syncWrites) {
2282            this.enableJournalDiskSyncs = syncWrites;
2283        }
2284    
2285        public long getCheckpointInterval() {
2286            return checkpointInterval;
2287        }
2288    
2289        public void setCheckpointInterval(long checkpointInterval) {
2290            this.checkpointInterval = checkpointInterval;
2291        }
2292    
2293        public long getCleanupInterval() {
2294            return cleanupInterval;
2295        }
2296    
2297        public void setCleanupInterval(long cleanupInterval) {
2298            this.cleanupInterval = cleanupInterval;
2299        }
2300    
2301        public void setJournalMaxFileLength(int journalMaxFileLength) {
2302            this.journalMaxFileLength = journalMaxFileLength;
2303        }
2304    
2305        public int getJournalMaxFileLength() {
2306            return journalMaxFileLength;
2307        }
2308    
2309        public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
2310            this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
2311        }
2312    
2313        public int getMaxFailoverProducersToTrack() {
2314            return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
2315        }
2316    
2317        public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
2318            this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
2319        }
2320    
2321        public int getFailoverProducersAuditDepth() {
2322            return this.metadata.producerSequenceIdTracker.getAuditDepth();
2323        }
2324    
2325        public PageFile getPageFile() {
2326            if (pageFile == null) {
2327                pageFile = createPageFile();
2328            }
2329            return pageFile;
2330        }
2331    
2332        public Journal getJournal() throws IOException {
2333            if (journal == null) {
2334                journal = createJournal();
2335            }
2336            return journal;
2337        }
2338    
2339        public boolean isFailIfDatabaseIsLocked() {
2340            return failIfDatabaseIsLocked;
2341        }
2342    
2343        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
2344            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
2345        }
2346    
2347        public boolean isIgnoreMissingJournalfiles() {
2348            return ignoreMissingJournalfiles;
2349        }
2350    
2351        public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
2352            this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
2353        }
2354    
2355        public int getIndexCacheSize() {
2356            return indexCacheSize;
2357        }
2358    
2359        public void setIndexCacheSize(int indexCacheSize) {
2360            this.indexCacheSize = indexCacheSize;
2361        }
2362    
2363        public boolean isCheckForCorruptJournalFiles() {
2364            return checkForCorruptJournalFiles;
2365        }
2366    
2367        public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
2368            this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
2369        }
2370    
2371        public boolean isChecksumJournalFiles() {
2372            return checksumJournalFiles;
2373        }
2374    
2375        public void setChecksumJournalFiles(boolean checksumJournalFiles) {
2376            this.checksumJournalFiles = checksumJournalFiles;
2377        }
2378    
2379        @Override
2380        public void setBrokerService(BrokerService brokerService) {
2381            this.brokerService = brokerService;
2382        }
2383    
2384        /**
2385         * @return the archiveDataLogs
2386         */
2387        public boolean isArchiveDataLogs() {
2388            return this.archiveDataLogs;
2389        }
2390    
2391        /**
2392         * @param archiveDataLogs the archiveDataLogs to set
2393         */
2394        public void setArchiveDataLogs(boolean archiveDataLogs) {
2395            this.archiveDataLogs = archiveDataLogs;
2396        }
2397    
2398        /**
2399         * @return the directoryArchive
2400         */
2401        public File getDirectoryArchive() {
2402            return this.directoryArchive;
2403        }
2404    
2405        /**
2406         * @param directoryArchive the directoryArchive to set
2407         */
2408        public void setDirectoryArchive(File directoryArchive) {
2409            this.directoryArchive = directoryArchive;
2410        }
2411    
2412        public boolean isRewriteOnRedelivery() {
2413            return rewriteOnRedelivery;
2414        }
2415    
2416        public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
2417            this.rewriteOnRedelivery = rewriteOnRedelivery;
2418        }
2419    
2420        public boolean isArchiveCorruptedIndex() {
2421            return archiveCorruptedIndex;
2422        }
2423    
2424        public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
2425            this.archiveCorruptedIndex = archiveCorruptedIndex;
2426        }
2427    
2428        public float getIndexLFUEvictionFactor() {
2429            return indexLFUEvictionFactor;
2430        }
2431    
2432        public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
2433            this.indexLFUEvictionFactor = indexLFUEvictionFactor;
2434        }
2435    
2436        public boolean isUseIndexLFRUEviction() {
2437            return useIndexLFRUEviction;
2438        }
2439    
2440        public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
2441            this.useIndexLFRUEviction = useIndexLFRUEviction;
2442        }
2443    
2444        public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
2445            this.enableIndexDiskSyncs = enableIndexDiskSyncs;
2446        }
2447    
2448        public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
2449            this.enableIndexRecoveryFile = enableIndexRecoveryFile;
2450        }
2451    
2452        public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
2453            this.enableIndexPageCaching = enableIndexPageCaching;
2454        }
2455    
2456        public boolean isEnableIndexDiskSyncs() {
2457            return enableIndexDiskSyncs;
2458        }
2459    
2460        public boolean isEnableIndexRecoveryFile() {
2461            return enableIndexRecoveryFile;
2462        }
2463    
2464        public boolean isEnableIndexPageCaching() {
2465            return enableIndexPageCaching;
2466        }
2467    
2468        // /////////////////////////////////////////////////////////////////
2469        // Internal conversion methods.
2470        // /////////////////////////////////////////////////////////////////
2471    
2472        class MessageOrderCursor{
2473            long defaultCursorPosition;
2474            long lowPriorityCursorPosition;
2475            long highPriorityCursorPosition;
2476            MessageOrderCursor(){
2477            }
2478    
2479            MessageOrderCursor(long position){
2480                this.defaultCursorPosition=position;
2481                this.lowPriorityCursorPosition=position;
2482                this.highPriorityCursorPosition=position;
2483            }
2484    
2485            MessageOrderCursor(MessageOrderCursor other){
2486                this.defaultCursorPosition=other.defaultCursorPosition;
2487                this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2488                this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2489            }
2490    
2491            MessageOrderCursor copy() {
2492                return new MessageOrderCursor(this);
2493            }
2494    
2495            void reset() {
2496                this.defaultCursorPosition=0;
2497                this.highPriorityCursorPosition=0;
2498                this.lowPriorityCursorPosition=0;
2499            }
2500    
2501            void increment() {
2502                if (defaultCursorPosition!=0) {
2503                    defaultCursorPosition++;
2504                }
2505                if (highPriorityCursorPosition!=0) {
2506                    highPriorityCursorPosition++;
2507                }
2508                if (lowPriorityCursorPosition!=0) {
2509                    lowPriorityCursorPosition++;
2510                }
2511            }
2512    
2513            @Override
2514            public String toString() {
2515               return "MessageOrderCursor:[def:" + defaultCursorPosition
2516                       + ", low:" + lowPriorityCursorPosition
2517                       + ", high:" +  highPriorityCursorPosition + "]";
2518            }
2519    
2520            public void sync(MessageOrderCursor other) {
2521                this.defaultCursorPosition=other.defaultCursorPosition;
2522                this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2523                this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2524            }
2525        }
2526    
2527        class MessageOrderIndex {
2528            static final byte HI = 9;
2529            static final byte LO = 0;
2530            static final byte DEF = 4;
2531    
2532            long nextMessageId;
2533            BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2534            BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2535            BTreeIndex<Long, MessageKeys> highPriorityIndex;
2536            MessageOrderCursor cursor = new MessageOrderCursor();
2537            Long lastDefaultKey;
2538            Long lastHighKey;
2539            Long lastLowKey;
2540            byte lastGetPriority;
2541    
2542            MessageKeys remove(Transaction tx, Long key) throws IOException {
2543                MessageKeys result = defaultPriorityIndex.remove(tx, key);
2544                if (result == null && highPriorityIndex!=null) {
2545                    result = highPriorityIndex.remove(tx, key);
2546                    if (result ==null && lowPriorityIndex!=null) {
2547                        result = lowPriorityIndex.remove(tx, key);
2548                    }
2549                }
2550                return result;
2551            }
2552    
2553            void load(Transaction tx) throws IOException {
2554                defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2555                defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2556                defaultPriorityIndex.load(tx);
2557                lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2558                lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2559                lowPriorityIndex.load(tx);
2560                highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2561                highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2562                highPriorityIndex.load(tx);
2563            }
2564    
2565            void allocate(Transaction tx) throws IOException {
2566                defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2567                if (metadata.version >= 2) {
2568                    lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2569                    highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2570                }
2571            }
2572    
2573            void configureLast(Transaction tx) throws IOException {
2574                // Figure out the next key using the last entry in the destination.
2575                if (highPriorityIndex != null) {
2576                    Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
2577                    if (lastEntry != null) {
2578                        nextMessageId = lastEntry.getKey() + 1;
2579                    } else {
2580                        lastEntry = defaultPriorityIndex.getLast(tx);
2581                        if (lastEntry != null) {
2582                            nextMessageId = lastEntry.getKey() + 1;
2583                        } else {
2584                            lastEntry = lowPriorityIndex.getLast(tx);
2585                            if (lastEntry != null) {
2586                                nextMessageId = lastEntry.getKey() + 1;
2587                            }
2588                        }
2589                    }
2590                } else {
2591                    Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
2592                    if (lastEntry != null) {
2593                        nextMessageId = lastEntry.getKey() + 1;
2594                    }
2595                }
2596            }
2597    
2598            void clear(Transaction tx) throws IOException {
2599                this.remove(tx);
2600                this.resetCursorPosition();
2601                this.allocate(tx);
2602                this.load(tx);
2603                this.configureLast(tx);
2604            }
2605    
2606            void remove(Transaction tx) throws IOException {
2607                defaultPriorityIndex.clear(tx);
2608                defaultPriorityIndex.unload(tx);
2609                tx.free(defaultPriorityIndex.getPageId());
2610                if (lowPriorityIndex != null) {
2611                    lowPriorityIndex.clear(tx);
2612                    lowPriorityIndex.unload(tx);
2613    
2614                    tx.free(lowPriorityIndex.getPageId());
2615                }
2616                if (highPriorityIndex != null) {
2617                    highPriorityIndex.clear(tx);
2618                    highPriorityIndex.unload(tx);
2619                    tx.free(highPriorityIndex.getPageId());
2620                }
2621            }
2622    
2623            void resetCursorPosition() {
2624                this.cursor.reset();
2625                lastDefaultKey = null;
2626                lastHighKey = null;
2627                lastLowKey = null;
2628            }
2629    
2630            void setBatch(Transaction tx, Long sequence) throws IOException {
2631                if (sequence != null) {
2632                    Long nextPosition = new Long(sequence.longValue() + 1);
2633                    if (defaultPriorityIndex.containsKey(tx, sequence)) {
2634                        lastDefaultKey = sequence;
2635                        cursor.defaultCursorPosition = nextPosition.longValue();
2636                    } else if (highPriorityIndex != null) {
2637                        if (highPriorityIndex.containsKey(tx, sequence)) {
2638                            lastHighKey = sequence;
2639                            cursor.highPriorityCursorPosition = nextPosition.longValue();
2640                        } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2641                            lastLowKey = sequence;
2642                            cursor.lowPriorityCursorPosition = nextPosition.longValue();
2643                        }
2644                    } else {
2645                        LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this);
2646                        lastDefaultKey = sequence;
2647                        cursor.defaultCursorPosition = nextPosition.longValue();
2648                    }
2649                }
2650            }
2651    
2652            void setBatch(Transaction tx, LastAck last) throws IOException {
2653                setBatch(tx, last.lastAckedSequence);
2654                if (cursor.defaultCursorPosition == 0
2655                        && cursor.highPriorityCursorPosition == 0
2656                        && cursor.lowPriorityCursorPosition == 0) {
2657                    long next = last.lastAckedSequence + 1;
2658                    switch (last.priority) {
2659                        case DEF:
2660                            cursor.defaultCursorPosition = next;
2661                            cursor.highPriorityCursorPosition = next;
2662                            break;
2663                        case HI:
2664                            cursor.highPriorityCursorPosition = next;
2665                            break;
2666                        case LO:
2667                            cursor.lowPriorityCursorPosition = next;
2668                            cursor.defaultCursorPosition = next;
2669                            cursor.highPriorityCursorPosition = next;
2670                            break;
2671                    }
2672                }
2673            }
2674    
2675            void stoppedIterating() {
2676                if (lastDefaultKey!=null) {
2677                    cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2678                }
2679                if (lastHighKey!=null) {
2680                    cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2681                }
2682                if (lastLowKey!=null) {
2683                    cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2684                }
2685                lastDefaultKey = null;
2686                lastHighKey = null;
2687                lastLowKey = null;
2688            }
2689    
2690            void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2691                    throws IOException {
2692                if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2693                    getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2694                } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2695                    getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2696                } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2697                    getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2698                }
2699            }
2700    
2701            void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2702                    BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2703    
2704                Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
2705                deletes.add(iterator.next());
2706            }
2707    
2708            long getNextMessageId(int priority) {
2709                return nextMessageId++;
2710            }
2711    
2712            MessageKeys get(Transaction tx, Long key) throws IOException {
2713                MessageKeys result = defaultPriorityIndex.get(tx, key);
2714                if (result == null) {
2715                    result = highPriorityIndex.get(tx, key);
2716                    if (result == null) {
2717                        result = lowPriorityIndex.get(tx, key);
2718                        lastGetPriority = LO;
2719                    } else {
2720                        lastGetPriority = HI;
2721                    }
2722                } else {
2723                    lastGetPriority = DEF;
2724                }
2725                return result;
2726            }
2727    
2728            MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2729                if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
2730                    return defaultPriorityIndex.put(tx, key, value);
2731                } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
2732                    return highPriorityIndex.put(tx, key, value);
2733                } else {
2734                    return lowPriorityIndex.put(tx, key, value);
2735                }
2736            }
2737    
2738            Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
2739                return new MessageOrderIterator(tx,cursor);
2740            }
2741    
2742            Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
2743                return new MessageOrderIterator(tx,m);
2744            }
2745    
2746            public byte lastGetPriority() {
2747                return lastGetPriority;
2748            }
2749    
2750            class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
2751                Iterator<Entry<Long, MessageKeys>>currentIterator;
2752                final Iterator<Entry<Long, MessageKeys>>highIterator;
2753                final Iterator<Entry<Long, MessageKeys>>defaultIterator;
2754                final Iterator<Entry<Long, MessageKeys>>lowIterator;
2755    
2756                MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
2757                    this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
2758                    if (highPriorityIndex != null) {
2759                        this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
2760                    } else {
2761                        this.highIterator = null;
2762                    }
2763                    if (lowPriorityIndex != null) {
2764                        this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
2765                    } else {
2766                        this.lowIterator = null;
2767                    }
2768                }
2769    
2770                @Override
2771                public boolean hasNext() {
2772                    if (currentIterator == null) {
2773                        if (highIterator != null) {
2774                            if (highIterator.hasNext()) {
2775                                currentIterator = highIterator;
2776                                return currentIterator.hasNext();
2777                            }
2778                            if (defaultIterator.hasNext()) {
2779                                currentIterator = defaultIterator;
2780                                return currentIterator.hasNext();
2781                            }
2782                            if (lowIterator.hasNext()) {
2783                                currentIterator = lowIterator;
2784                                return currentIterator.hasNext();
2785                            }
2786                            return false;
2787                        } else {
2788                            currentIterator = defaultIterator;
2789                            return currentIterator.hasNext();
2790                        }
2791                    }
2792                    if (highIterator != null) {
2793                        if (currentIterator.hasNext()) {
2794                            return true;
2795                        }
2796                        if (currentIterator == highIterator) {
2797                            if (defaultIterator.hasNext()) {
2798                                currentIterator = defaultIterator;
2799                                return currentIterator.hasNext();
2800                            }
2801                            if (lowIterator.hasNext()) {
2802                                currentIterator = lowIterator;
2803                                return currentIterator.hasNext();
2804                            }
2805                            return false;
2806                        }
2807    
2808                        if (currentIterator == defaultIterator) {
2809                            if (lowIterator.hasNext()) {
2810                                currentIterator = lowIterator;
2811                                return currentIterator.hasNext();
2812                            }
2813                            return false;
2814                        }
2815                    }
2816                    return currentIterator.hasNext();
2817                }
2818    
2819                @Override
2820                public Entry<Long, MessageKeys> next() {
2821                    Entry<Long, MessageKeys> result = currentIterator.next();
2822                    if (result != null) {
2823                        Long key = result.getKey();
2824                        if (highIterator != null) {
2825                            if (currentIterator == defaultIterator) {
2826                                lastDefaultKey = key;
2827                            } else if (currentIterator == highIterator) {
2828                                lastHighKey = key;
2829                            } else {
2830                                lastLowKey = key;
2831                            }
2832                        } else {
2833                            lastDefaultKey = key;
2834                        }
2835                    }
2836                    return result;
2837                }
2838    
2839                @Override
2840                public void remove() {
2841                    throw new UnsupportedOperationException();
2842                }
2843    
2844            }
2845        }
2846    
2847        private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
2848            final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
2849    
2850            @Override
2851            public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
2852                ByteArrayOutputStream baos = new ByteArrayOutputStream();
2853                ObjectOutputStream oout = new ObjectOutputStream(baos);
2854                oout.writeObject(object);
2855                oout.flush();
2856                oout.close();
2857                byte[] data = baos.toByteArray();
2858                dataOut.writeInt(data.length);
2859                dataOut.write(data);
2860            }
2861    
2862            @Override
2863            @SuppressWarnings("unchecked")
2864            public HashSet<String> readPayload(DataInput dataIn) throws IOException {
2865                int dataLen = dataIn.readInt();
2866                byte[] data = new byte[dataLen];
2867                dataIn.readFully(data);
2868                ByteArrayInputStream bais = new ByteArrayInputStream(data);
2869                ObjectInputStream oin = new ObjectInputStream(bais);
2870                try {
2871                    return (HashSet<String>) oin.readObject();
2872                } catch (ClassNotFoundException cfe) {
2873                    IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
2874                    ioe.initCause(cfe);
2875                    throw ioe;
2876                }
2877            }
2878        }
2879    }