001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInput;
022    import java.io.DataOutput;
023    import java.io.EOFException;
024    import java.io.File;
025    import java.io.IOException;
026    import java.io.InputStream;
027    import java.io.InterruptedIOException;
028    import java.io.ObjectInputStream;
029    import java.io.ObjectOutputStream;
030    import java.io.OutputStream;
031    import java.util.ArrayList;
032    import java.util.Collection;
033    import java.util.Collections;
034    import java.util.Date;
035    import java.util.HashMap;
036    import java.util.HashSet;
037    import java.util.Iterator;
038    import java.util.LinkedHashMap;
039    import java.util.LinkedHashSet;
040    import java.util.List;
041    import java.util.Map;
042    import java.util.Map.Entry;
043    import java.util.Set;
044    import java.util.SortedSet;
045    import java.util.Stack;
046    import java.util.TreeMap;
047    import java.util.TreeSet;
048    import java.util.concurrent.atomic.AtomicBoolean;
049    import java.util.concurrent.atomic.AtomicLong;
050    import java.util.concurrent.locks.ReentrantReadWriteLock;
051    
052    import org.apache.activemq.ActiveMQMessageAuditNoSync;
053    import org.apache.activemq.broker.BrokerService;
054    import org.apache.activemq.broker.BrokerServiceAware;
055    import org.apache.activemq.command.MessageAck;
056    import org.apache.activemq.command.SubscriptionInfo;
057    import org.apache.activemq.command.TransactionId;
058    import org.apache.activemq.protobuf.Buffer;
059    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
060    import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
061    import org.apache.activemq.store.kahadb.data.KahaDestination;
062    import org.apache.activemq.store.kahadb.data.KahaEntryType;
063    import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
064    import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
065    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
066    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
067    import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
068    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
069    import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
070    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
071    import org.apache.activemq.util.Callback;
072    import org.apache.activemq.util.IOHelper;
073    import org.apache.activemq.util.ServiceStopper;
074    import org.apache.activemq.util.ServiceSupport;
075    import org.apache.kahadb.index.BTreeIndex;
076    import org.apache.kahadb.index.BTreeVisitor;
077    import org.apache.kahadb.index.ListIndex;
078    import org.apache.kahadb.journal.DataFile;
079    import org.apache.kahadb.journal.Journal;
080    import org.apache.kahadb.journal.Location;
081    import org.apache.kahadb.page.Page;
082    import org.apache.kahadb.page.PageFile;
083    import org.apache.kahadb.page.Transaction;
084    import org.apache.kahadb.util.ByteSequence;
085    import org.apache.kahadb.util.DataByteArrayInputStream;
086    import org.apache.kahadb.util.DataByteArrayOutputStream;
087    import org.apache.kahadb.util.LocationMarshaller;
088    import org.apache.kahadb.util.LockFile;
089    import org.apache.kahadb.util.LongMarshaller;
090    import org.apache.kahadb.util.Marshaller;
091    import org.apache.kahadb.util.Sequence;
092    import org.apache.kahadb.util.SequenceSet;
093    import org.apache.kahadb.util.StringMarshaller;
094    import org.apache.kahadb.util.VariableMarshaller;
095    import org.slf4j.Logger;
096    import org.slf4j.LoggerFactory;
097    
098    public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
099    
100        protected BrokerService brokerService;
101    
102        public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
103        public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
104        public static final File DEFAULT_DIRECTORY = new File("KahaDB");
105        protected static final Buffer UNMATCHED;
106        static {
107            UNMATCHED = new Buffer(new byte[]{});
108        }
109        private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
110        private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
111    
112        static final int CLOSED_STATE = 1;
113        static final int OPEN_STATE = 2;
114        static final long NOT_ACKED = -1;
115    
116        static final int VERSION = 4;
117    
118        protected class Metadata {
119            protected Page<Metadata> page;
120            protected int state;
121            protected BTreeIndex<String, StoredDestination> destinations;
122            protected Location lastUpdate;
123            protected Location firstInProgressTransactionLocation;
124            protected Location producerSequenceIdTrackerLocation = null;
125            protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
126            protected int version = VERSION;
127            public void read(DataInput is) throws IOException {
128                state = is.readInt();
129                destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
130                if (is.readBoolean()) {
131                    lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
132                } else {
133                    lastUpdate = null;
134                }
135                if (is.readBoolean()) {
136                    firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
137                } else {
138                    firstInProgressTransactionLocation = null;
139                }
140                try {
141                    if (is.readBoolean()) {
142                        producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
143                    } else {
144                        producerSequenceIdTrackerLocation = null;
145                    }
146                } catch (EOFException expectedOnUpgrade) {
147                }
148                try {
149                   version = is.readInt();
150                } catch (EOFException expectedOnUpgrade) {
151                    version=1;
152                }
153                LOG.info("KahaDB is version " + version);
154            }
155    
156            public void write(DataOutput os) throws IOException {
157                os.writeInt(state);
158                os.writeLong(destinations.getPageId());
159    
160                if (lastUpdate != null) {
161                    os.writeBoolean(true);
162                    LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
163                } else {
164                    os.writeBoolean(false);
165                }
166    
167                if (firstInProgressTransactionLocation != null) {
168                    os.writeBoolean(true);
169                    LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
170                } else {
171                    os.writeBoolean(false);
172                }
173    
174                if (producerSequenceIdTrackerLocation != null) {
175                    os.writeBoolean(true);
176                    LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
177                } else {
178                    os.writeBoolean(false);
179                }
180                os.writeInt(VERSION);
181            }
182        }
183    
184        class MetadataMarshaller extends VariableMarshaller<Metadata> {
185            public Metadata readPayload(DataInput dataIn) throws IOException {
186                Metadata rc = new Metadata();
187                rc.read(dataIn);
188                return rc;
189            }
190    
191            public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
192                object.write(dataOut);
193            }
194        }
195    
196        protected PageFile pageFile;
197        protected Journal journal;
198        protected Metadata metadata = new Metadata();
199    
200        protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
201    
202        protected boolean failIfDatabaseIsLocked;
203    
204        protected boolean deleteAllMessages;
205        protected File directory = DEFAULT_DIRECTORY;
206        protected Thread checkpointThread;
207        protected boolean enableJournalDiskSyncs=true;
208        protected boolean archiveDataLogs;
209        protected File directoryArchive;
210        protected AtomicLong journalSize = new AtomicLong(0);
211        long checkpointInterval = 5*1000;
212        long cleanupInterval = 30*1000;
213        int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
214        int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
215        boolean enableIndexWriteAsync = false;
216        int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
217    
218        protected AtomicBoolean opened = new AtomicBoolean();
219        private LockFile lockFile;
220        private boolean ignoreMissingJournalfiles = false;
221        private int indexCacheSize = 10000;
222        private boolean checkForCorruptJournalFiles = false;
223        private boolean checksumJournalFiles = false;
224        protected boolean forceRecoverIndex = false;
225        private final Object checkpointThreadLock = new Object();
226        private boolean rewriteOnRedelivery = false;
227        private boolean archiveCorruptedIndex = false;
228        private boolean useIndexLFRUEviction = false;
229        private float indexLFUEvictionFactor = 0.2f;
230        private boolean enableIndexDiskSyncs = true;
231        private boolean enableIndexRecoveryFile = true;
232        private boolean enableIndexPageCaching = true;
233    
234        public MessageDatabase() {
235        }
236    
237        @Override
238        public void doStart() throws Exception {
239            load();
240        }
241    
242        @Override
243        public void doStop(ServiceStopper stopper) throws Exception {
244            unload();
245        }
246    
247        private void loadPageFile() throws IOException {
248            this.indexLock.writeLock().lock();
249            try {
250                final PageFile pageFile = getPageFile();
251                pageFile.load();
252                pageFile.tx().execute(new Transaction.Closure<IOException>() {
253                    public void execute(Transaction tx) throws IOException {
254                        if (pageFile.getPageCount() == 0) {
255                            // First time this is created.. Initialize the metadata
256                            Page<Metadata> page = tx.allocate();
257                            assert page.getPageId() == 0;
258                            page.set(metadata);
259                            metadata.page = page;
260                            metadata.state = CLOSED_STATE;
261                            metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
262    
263                            tx.store(metadata.page, metadataMarshaller, true);
264                        } else {
265                            Page<Metadata> page = tx.load(0, metadataMarshaller);
266                            metadata = page.get();
267                            metadata.page = page;
268                        }
269                        metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
270                        metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
271                        metadata.destinations.load(tx);
272                    }
273                });
274                // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
275                // Perhaps we should just keep an index of file
276                storedDestinations.clear();
277                pageFile.tx().execute(new Transaction.Closure<IOException>() {
278                    public void execute(Transaction tx) throws IOException {
279                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
280                            Entry<String, StoredDestination> entry = iterator.next();
281                            StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
282                            storedDestinations.put(entry.getKey(), sd);
283                        }
284                    }
285                });
286                pageFile.flush();
287            } finally {
288                this.indexLock.writeLock().unlock();
289            }
290        }
291    
292        private void startCheckpoint() {
293            if (checkpointInterval == 0 &&  cleanupInterval == 0) {
294                LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
295                return;
296            }
297            synchronized (checkpointThreadLock) {
298                boolean start = false;
299                if (checkpointThread == null) {
300                    start = true;
301                } else if (!checkpointThread.isAlive()) {
302                    start = true;
303                    LOG.info("KahaDB: Recovering checkpoint thread after death");
304                }
305                if (start) {
306                    checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
307                        @Override
308                        public void run() {
309                            try {
310                                long lastCleanup = System.currentTimeMillis();
311                                long lastCheckpoint = System.currentTimeMillis();
312                                // Sleep for a short time so we can periodically check
313                                // to see if we need to exit this thread.
314                                long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
315                                while (opened.get()) {
316                                    Thread.sleep(sleepTime);
317                                    long now = System.currentTimeMillis();
318                                    if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
319                                        checkpointCleanup(true);
320                                        lastCleanup = now;
321                                        lastCheckpoint = now;
322                                    } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
323                                        checkpointCleanup(false);
324                                        lastCheckpoint = now;
325                                    }
326                                }
327                            } catch (InterruptedException e) {
328                                // Looks like someone really wants us to exit this thread...
329                            } catch (IOException ioe) {
330                                LOG.error("Checkpoint failed", ioe);
331                                brokerService.handleIOException(ioe);
332                            }
333                        }
334                    };
335    
336                    checkpointThread.setDaemon(true);
337                    checkpointThread.start();
338                }
339            }
340        }
341    
342        public void open() throws IOException {
343            if( opened.compareAndSet(false, true) ) {
344                getJournal().start();
345                try {
346                    loadPageFile();
347                } catch (Throwable t) {
348                    LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
349                    if (LOG.isDebugEnabled()) {
350                        LOG.debug("Index load failure", t);
351                    }
352                    // try to recover index
353                    try {
354                        pageFile.unload();
355                    } catch (Exception ignore) {}
356                    if (archiveCorruptedIndex) {
357                        pageFile.archive();
358                    } else {
359                        pageFile.delete();
360                    }
361                    metadata = new Metadata();
362                    pageFile = null;
363                    loadPageFile();
364                }
365                startCheckpoint();
366                recover();
367            }
368        }
369    
370        public void load() throws IOException {
371            this.indexLock.writeLock().lock();
372            IOHelper.mkdirs(directory);
373            try {
374                if (deleteAllMessages) {
375                    getJournal().start();
376                    getJournal().delete();
377                    getJournal().close();
378                    journal = null;
379                    getPageFile().delete();
380                    LOG.info("Persistence store purged.");
381                    deleteAllMessages = false;
382                }
383    
384                open();
385                store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
386            } finally {
387                this.indexLock.writeLock().unlock();
388            }
389        }
390    
391        public void close() throws IOException, InterruptedException {
392            if( opened.compareAndSet(true, false)) {
393                this.indexLock.writeLock().lock();
394                try {
395                    if (metadata.page != null) {
396                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
397                            public void execute(Transaction tx) throws IOException {
398                                checkpointUpdate(tx, true);
399                            }
400                        });
401                    }
402                    pageFile.unload();
403                    metadata = new Metadata();
404                } finally {
405                    this.indexLock.writeLock().unlock();
406                }
407                journal.close();
408                synchronized (checkpointThreadLock) {
409                    if (checkpointThread != null) {
410                        checkpointThread.join();
411                    }
412                }
413            }
414        }
415    
416        public void unload() throws IOException, InterruptedException {
417            this.indexLock.writeLock().lock();
418            try {
419                if( pageFile != null && pageFile.isLoaded() ) {
420                    metadata.state = CLOSED_STATE;
421                    metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
422    
423                    if (metadata.page != null) {
424                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
425                            public void execute(Transaction tx) throws IOException {
426                                tx.store(metadata.page, metadataMarshaller, true);
427                            }
428                        });
429                    }
430                }
431            } finally {
432                this.indexLock.writeLock().unlock();
433            }
434            close();
435        }
436    
437        // public for testing
438        @SuppressWarnings("rawtypes")
439        public Location getFirstInProgressTxLocation() {
440            Location l = null;
441            synchronized (inflightTransactions) {
442                if (!inflightTransactions.isEmpty()) {
443                    for (List<Operation> ops : inflightTransactions.values()) {
444                        if (!ops.isEmpty()) {
445                            l = ops.get(0).getLocation();
446                            break;
447                        }
448                    }
449                }
450                if (!preparedTransactions.isEmpty()) {
451                    for (List<Operation> ops : preparedTransactions.values()) {
452                        if (!ops.isEmpty()) {
453                            Location t = ops.get(0).getLocation();
454                            if (l==null || t.compareTo(l) <= 0) {
455                                l = t;
456                            }
457                            break;
458                        }
459                    }
460                }
461            }
462            return l;
463        }
464    
465        /**
466         * Move all the messages that were in the journal into long term storage. We
467         * just replay and do a checkpoint.
468         *
469         * @throws IOException
470         * @throws IOException
471         * @throws IllegalStateException
472         */
473        private void recover() throws IllegalStateException, IOException {
474            this.indexLock.writeLock().lock();
475            try {
476    
477                long start = System.currentTimeMillis();
478                Location producerAuditPosition = recoverProducerAudit();
479                Location lastIndoubtPosition = getRecoveryPosition();
480    
481                Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
482    
483                if (recoveryPosition != null) {
484                    int redoCounter = 0;
485                    LOG.info("Recovering from the journal ...");
486                    while (recoveryPosition != null) {
487                        JournalCommand<?> message = load(recoveryPosition);
488                        metadata.lastUpdate = recoveryPosition;
489                        process(message, recoveryPosition, lastIndoubtPosition);
490                        redoCounter++;
491                        recoveryPosition = journal.getNextLocation(recoveryPosition);
492                         if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
493                             LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries recovered ..");
494                         }
495                    }
496                    if (LOG.isInfoEnabled()) {
497                        long end = System.currentTimeMillis();
498                        LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
499                    }
500                }
501    
502                // We may have to undo some index updates.
503                pageFile.tx().execute(new Transaction.Closure<IOException>() {
504                    public void execute(Transaction tx) throws IOException {
505                        recoverIndex(tx);
506                    }
507                });
508    
509                // rollback any recovered inflight local transactions
510                Set<TransactionId> toRollback = new HashSet<TransactionId>();
511                synchronized (inflightTransactions) {
512                    for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
513                        TransactionId id = it.next();
514                        if (id.isLocalTransaction()) {
515                            toRollback.add(id);
516                        }
517                    }
518                    for (TransactionId tx: toRollback) {
519                        if (LOG.isDebugEnabled()) {
520                            LOG.debug("rolling back recovered indoubt local transaction " + tx);
521                        }
522                        store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
523                    }
524                }
525            } finally {
526                this.indexLock.writeLock().unlock();
527            }
528        }
529    
530        @SuppressWarnings("unused")
531        private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
532            return TransactionIdConversion.convertToLocal(tx);
533        }
534    
535        private Location minimum(Location producerAuditPosition,
536                Location lastIndoubtPosition) {
537            Location min = null;
538            if (producerAuditPosition != null) {
539                min = producerAuditPosition;
540                if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
541                    min = lastIndoubtPosition;
542                }
543            } else {
544                min = lastIndoubtPosition;
545            }
546            return min;
547        }
548    
549        private Location recoverProducerAudit() throws IOException {
550            if (metadata.producerSequenceIdTrackerLocation != null) {
551                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
552                try {
553                    ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
554                    metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
555                    return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
556                } catch (Exception e) {
557                    LOG.warn("Cannot recover message audit", e);
558                    return journal.getNextLocation(null);
559                }
560            } else {
561                // got no audit stored so got to recreate via replay from start of the journal
562                return journal.getNextLocation(null);
563            }
564        }
565    
566        protected void recoverIndex(Transaction tx) throws IOException {
567            long start = System.currentTimeMillis();
568            // It is possible index updates got applied before the journal updates..
569            // in that case we need to removed references to messages that are not in the journal
570            final Location lastAppendLocation = journal.getLastAppendLocation();
571            long undoCounter=0;
572    
573            // Go through all the destinations to see if they have messages past the lastAppendLocation
574            for (StoredDestination sd : storedDestinations.values()) {
575    
576                final ArrayList<Long> matches = new ArrayList<Long>();
577                // Find all the Locations that are >= than the last Append Location.
578                sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
579                    @Override
580                    protected void matched(Location key, Long value) {
581                        matches.add(value);
582                    }
583                });
584    
585                for (Long sequenceId : matches) {
586                    MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
587                    sd.locationIndex.remove(tx, keys.location);
588                    sd.messageIdIndex.remove(tx, keys.messageId);
589                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
590                    undoCounter++;
591                    // TODO: do we need to modify the ack positions for the pub sub case?
592                }
593            }
594    
595            if( undoCounter > 0 ) {
596                // The rolledback operations are basically in flight journal writes.  To avoid getting
597                // these the end user should do sync writes to the journal.
598                if (LOG.isInfoEnabled()) {
599                    long end = System.currentTimeMillis();
600                    LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
601                }
602            }
603    
604            undoCounter = 0;
605            start = System.currentTimeMillis();
606    
607            // Lets be extra paranoid here and verify that all the datafiles being referenced
608            // by the indexes still exists.
609    
610            final SequenceSet ss = new SequenceSet();
611            for (StoredDestination sd : storedDestinations.values()) {
612                // Use a visitor to cut down the number of pages that we load
613                sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
614                    int last=-1;
615    
616                    public boolean isInterestedInKeysBetween(Location first, Location second) {
617                        if( first==null ) {
618                            return !ss.contains(0, second.getDataFileId());
619                        } else if( second==null ) {
620                            return true;
621                        } else {
622                            return !ss.contains(first.getDataFileId(), second.getDataFileId());
623                        }
624                    }
625    
626                    public void visit(List<Location> keys, List<Long> values) {
627                        for (Location l : keys) {
628                            int fileId = l.getDataFileId();
629                            if( last != fileId ) {
630                                ss.add(fileId);
631                                last = fileId;
632                            }
633                        }
634                    }
635    
636                });
637            }
638            HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
639            while (!ss.isEmpty()) {
640                missingJournalFiles.add((int) ss.removeFirst());
641            }
642            missingJournalFiles.removeAll(journal.getFileMap().keySet());
643    
644            if (!missingJournalFiles.isEmpty()) {
645                if (LOG.isInfoEnabled()) {
646                    LOG.info("Some journal files are missing: " + missingJournalFiles);
647                }
648            }
649    
650            ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
651            for (Integer missing : missingJournalFiles) {
652                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
653            }
654    
655            if (checkForCorruptJournalFiles) {
656                Collection<DataFile> dataFiles = journal.getFileMap().values();
657                for (DataFile dataFile : dataFiles) {
658                    int id = dataFile.getDataFileId();
659                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
660                    Sequence seq = dataFile.getCorruptedBlocks().getHead();
661                    while (seq != null) {
662                        missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
663                        seq = seq.getNext();
664                    }
665                }
666            }
667    
668            if (!missingPredicates.isEmpty()) {
669                for (StoredDestination sd : storedDestinations.values()) {
670    
671                    final ArrayList<Long> matches = new ArrayList<Long>();
672                    sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
673                        @Override
674                        protected void matched(Location key, Long value) {
675                            matches.add(value);
676                        }
677                    });
678    
679                    // If somes message references are affected by the missing data files...
680                    if (!matches.isEmpty()) {
681    
682                        // We either 'gracefully' recover dropping the missing messages or
683                        // we error out.
684                        if( ignoreMissingJournalfiles ) {
685                            // Update the index to remove the references to the missing data
686                            for (Long sequenceId : matches) {
687                                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
688                                sd.locationIndex.remove(tx, keys.location);
689                                sd.messageIdIndex.remove(tx, keys.messageId);
690                                undoCounter++;
691                                // TODO: do we need to modify the ack positions for the pub sub case?
692                            }
693    
694                        } else {
695                            throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
696                        }
697                    }
698                }
699            }
700    
701            if( undoCounter > 0 ) {
702                // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
703                // should do sync writes to the journal.
704                if (LOG.isInfoEnabled()) {
705                    long end = System.currentTimeMillis();
706                    LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
707                }
708            }
709        }
710    
711        private Location nextRecoveryPosition;
712        private Location lastRecoveryPosition;
713    
714        public void incrementalRecover() throws IOException {
715            this.indexLock.writeLock().lock();
716            try {
717                if( nextRecoveryPosition == null ) {
718                    if( lastRecoveryPosition==null ) {
719                        nextRecoveryPosition = getRecoveryPosition();
720                    } else {
721                        nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
722                    }
723                }
724                while (nextRecoveryPosition != null) {
725                    lastRecoveryPosition = nextRecoveryPosition;
726                    metadata.lastUpdate = lastRecoveryPosition;
727                    JournalCommand<?> message = load(lastRecoveryPosition);
728                    process(message, lastRecoveryPosition, (Runnable)null);
729                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
730                }
731            } finally {
732                this.indexLock.writeLock().unlock();
733            }
734        }
735    
736        public Location getLastUpdatePosition() throws IOException {
737            return metadata.lastUpdate;
738        }
739    
740        private Location getRecoveryPosition() throws IOException {
741    
742            if (!this.forceRecoverIndex) {
743    
744                // If we need to recover the transactions..
745                if (metadata.firstInProgressTransactionLocation != null) {
746                    return metadata.firstInProgressTransactionLocation;
747                }
748    
749                // Perhaps there were no transactions...
750                if( metadata.lastUpdate!=null) {
751                    // Start replay at the record after the last one recorded in the index file.
752                    return journal.getNextLocation(metadata.lastUpdate);
753                }
754            }
755            // This loads the first position.
756            return journal.getNextLocation(null);
757        }
758    
759        protected void checkpointCleanup(final boolean cleanup) throws IOException {
760            long start;
761            this.indexLock.writeLock().lock();
762            try {
763                start = System.currentTimeMillis();
764                if( !opened.get() ) {
765                    return;
766                }
767                pageFile.tx().execute(new Transaction.Closure<IOException>() {
768                    public void execute(Transaction tx) throws IOException {
769                        checkpointUpdate(tx, cleanup);
770                    }
771                });
772            } finally {
773                this.indexLock.writeLock().unlock();
774            }
775    
776            long end = System.currentTimeMillis();
777            if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
778                if (LOG.isInfoEnabled()) {
779                    LOG.info("Slow KahaDB access: cleanup took " + (end - start));
780                }
781            }
782        }
783    
784        public void checkpoint(Callback closure) throws Exception {
785            this.indexLock.writeLock().lock();
786            try {
787                pageFile.tx().execute(new Transaction.Closure<IOException>() {
788                    public void execute(Transaction tx) throws IOException {
789                        checkpointUpdate(tx, false);
790                    }
791                });
792                closure.execute();
793            } finally {
794                this.indexLock.writeLock().unlock();
795            }
796        }
797    
798        public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
799            int size = data.serializedSizeFramed();
800            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
801            os.writeByte(data.type().getNumber());
802            data.writeFramed(os);
803            return os.toByteSequence();
804        }
805    
806        // /////////////////////////////////////////////////////////////////
807        // Methods call by the broker to update and query the store.
808        // /////////////////////////////////////////////////////////////////
809        public Location store(JournalCommand<?> data) throws IOException {
810            return store(data, false, null,null);
811        }
812    
813        public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
814            return store(data, false, null,null, onJournalStoreComplete);
815        }
816    
817        public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
818            return store(data, sync, before, after, null);
819        }
820    
821        /**
822         * All updated are are funneled through this method. The updates are converted
823         * to a JournalMessage which is logged to the journal and then the data from
824         * the JournalMessage is used to update the index just like it would be done
825         * during a recovery process.
826         */
827        public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException {
828            if (before != null) {
829                before.run();
830            }
831            try {
832                ByteSequence sequence = toByteSequence(data);
833                long start = System.currentTimeMillis();
834                Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) :  journal.write(sequence, onJournalStoreComplete) ;
835                long start2 = System.currentTimeMillis();
836                process(data, location, after);
837                long end = System.currentTimeMillis();
838                if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
839                    if (LOG.isInfoEnabled()) {
840                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
841                    }
842                }
843    
844                if (after != null) {
845                    Runnable afterCompletion = null;
846                    synchronized (orderedTransactionAfters) {
847                        if (!orderedTransactionAfters.empty()) {
848                            afterCompletion = orderedTransactionAfters.pop();
849                        }
850                    }
851                    if (afterCompletion != null) {
852                        afterCompletion.run();
853                    } else {
854                        // non persistent message case
855                        after.run();
856                    }
857                }
858    
859                if (checkpointThread != null && !checkpointThread.isAlive()) {
860                    startCheckpoint();
861                }
862                return location;
863            } catch (IOException ioe) {
864                LOG.error("KahaDB failed to store to Journal", ioe);
865                brokerService.handleIOException(ioe);
866                throw ioe;
867            }
868        }
869    
870        /**
871         * Loads a previously stored JournalMessage
872         *
873         * @param location
874         * @return
875         * @throws IOException
876         */
877        public JournalCommand<?> load(Location location) throws IOException {
878            long start = System.currentTimeMillis();
879            ByteSequence data = journal.read(location);
880            long end = System.currentTimeMillis();
881            if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
882                if (LOG.isInfoEnabled()) {
883                    LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
884                }
885            }
886            DataByteArrayInputStream is = new DataByteArrayInputStream(data);
887            byte readByte = is.readByte();
888            KahaEntryType type = KahaEntryType.valueOf(readByte);
889            if( type == null ) {
890                throw new IOException("Could not load journal record. Invalid location: "+location);
891            }
892            JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
893            message.mergeFramed(is);
894            return message;
895        }
896    
897        /**
898         * do minimal recovery till we reach the last inDoubtLocation
899         * @param data
900         * @param location
901         * @param inDoubtlocation
902         * @throws IOException
903         */
904        void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
905            if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
906                process(data, location, (Runnable) null);
907            } else {
908                // just recover producer audit
909                data.visit(new Visitor() {
910                    public void visit(KahaAddMessageCommand command) throws IOException {
911                        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
912                    }
913                });
914            }
915        }
916    
917        // /////////////////////////////////////////////////////////////////
918        // Journaled record processing methods. Once the record is journaled,
919        // these methods handle applying the index updates. These may be called
920        // from the recovery method too so they need to be idempotent
921        // /////////////////////////////////////////////////////////////////
922    
923        void process(JournalCommand<?> data, final Location location, final Runnable after) throws IOException {
924            data.visit(new Visitor() {
925                @Override
926                public void visit(KahaAddMessageCommand command) throws IOException {
927                    process(command, location);
928                }
929    
930                @Override
931                public void visit(KahaRemoveMessageCommand command) throws IOException {
932                    process(command, location);
933                }
934    
935                @Override
936                public void visit(KahaPrepareCommand command) throws IOException {
937                    process(command, location);
938                }
939    
940                @Override
941                public void visit(KahaCommitCommand command) throws IOException {
942                    process(command, location, after);
943                }
944    
945                @Override
946                public void visit(KahaRollbackCommand command) throws IOException {
947                    process(command, location);
948                }
949    
950                @Override
951                public void visit(KahaRemoveDestinationCommand command) throws IOException {
952                    process(command, location);
953                }
954    
955                @Override
956                public void visit(KahaSubscriptionCommand command) throws IOException {
957                    process(command, location);
958                }
959    
960                @Override
961                public void visit(KahaProducerAuditCommand command) throws IOException {
962                    processLocation(location);
963                }
964    
965                @Override
966                public void visit(KahaTraceCommand command) {
967                    processLocation(location);
968                }
969            });
970        }
971    
972        @SuppressWarnings("rawtypes")
973        protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
974            if (command.hasTransactionInfo()) {
975                List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
976                inflightTx.add(new AddOpperation(command, location));
977            } else {
978                this.indexLock.writeLock().lock();
979                try {
980                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
981                        public void execute(Transaction tx) throws IOException {
982                            upadateIndex(tx, command, location);
983                        }
984                    });
985                } finally {
986                    this.indexLock.writeLock().unlock();
987                }
988            }
989        }
990    
991        @SuppressWarnings("rawtypes")
992        protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
993            if (command.hasTransactionInfo()) {
994               List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
995               inflightTx.add(new RemoveOpperation(command, location));
996            } else {
997                this.indexLock.writeLock().lock();
998                try {
999                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1000                        public void execute(Transaction tx) throws IOException {
1001                            updateIndex(tx, command, location);
1002                        }
1003                    });
1004                } finally {
1005                    this.indexLock.writeLock().unlock();
1006                }
1007            }
1008        }
1009    
1010        protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1011            this.indexLock.writeLock().lock();
1012            try {
1013                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1014                    public void execute(Transaction tx) throws IOException {
1015                        updateIndex(tx, command, location);
1016                    }
1017                });
1018            } finally {
1019                this.indexLock.writeLock().unlock();
1020            }
1021        }
1022    
1023        protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1024            this.indexLock.writeLock().lock();
1025            try {
1026                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1027                    public void execute(Transaction tx) throws IOException {
1028                        updateIndex(tx, command, location);
1029                    }
1030                });
1031            } finally {
1032                this.indexLock.writeLock().unlock();
1033            }
1034        }
1035    
1036        protected void processLocation(final Location location) {
1037            this.indexLock.writeLock().lock();
1038            try {
1039                metadata.lastUpdate = location;
1040            } finally {
1041                this.indexLock.writeLock().unlock();
1042            }
1043        }
1044    
1045        private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>();
1046        private void push(Runnable after) {
1047            if (after != null) {
1048                synchronized (orderedTransactionAfters) {
1049                    orderedTransactionAfters.push(after);
1050                }
1051            }
1052        }
1053    
1054        @SuppressWarnings("rawtypes")
1055        protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
1056            TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1057            List<Operation> inflightTx;
1058            synchronized (inflightTransactions) {
1059                inflightTx = inflightTransactions.remove(key);
1060                if (inflightTx == null) {
1061                    inflightTx = preparedTransactions.remove(key);
1062                }
1063            }
1064            if (inflightTx == null) {
1065                if (after != null) {
1066                    // since we don't push this after and we may find another, lets run it now
1067                    after.run();
1068                }
1069                return;
1070            }
1071    
1072            final List<Operation> messagingTx = inflightTx;
1073            this.indexLock.writeLock().lock();
1074            try {
1075                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1076                    public void execute(Transaction tx) throws IOException {
1077                        for (Operation op : messagingTx) {
1078                            op.execute(tx);
1079                        }
1080                    }
1081                });
1082                metadata.lastUpdate = location;
1083                push(after);
1084            } finally {
1085                this.indexLock.writeLock().unlock();
1086            }
1087        }
1088    
1089        @SuppressWarnings("rawtypes")
1090        protected void process(KahaPrepareCommand command, Location location) {
1091            TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1092            synchronized (inflightTransactions) {
1093                List<Operation> tx = inflightTransactions.remove(key);
1094                if (tx != null) {
1095                    preparedTransactions.put(key, tx);
1096                }
1097            }
1098        }
1099    
1100        @SuppressWarnings("rawtypes")
1101        protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1102            TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1103            List<Operation> updates = null;
1104            synchronized (inflightTransactions) {
1105                updates = inflightTransactions.remove(key);
1106                if (updates == null) {
1107                    updates = preparedTransactions.remove(key);
1108                }
1109            }
1110            if (isRewriteOnRedelivery()) {
1111                persistRedeliveryCount(updates);
1112            }
1113        }
1114    
1115        @SuppressWarnings("rawtypes")
1116        private void persistRedeliveryCount(List<Operation> updates)  throws IOException {
1117            if (updates != null) {
1118                for (Operation operation : updates) {
1119                    operation.getCommand().visit(new Visitor() {
1120                        @Override
1121                        public void visit(KahaRemoveMessageCommand command) throws IOException {
1122                            incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination());
1123                        }
1124                    });
1125                }
1126            }
1127        }
1128    
1129       abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException;
1130    
1131        // /////////////////////////////////////////////////////////////////
1132        // These methods do the actual index updates.
1133        // /////////////////////////////////////////////////////////////////
1134    
1135        protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1136        private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1137    
1138        void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1139            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1140    
1141            // Skip adding the message to the index if this is a topic and there are
1142            // no subscriptions.
1143            if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1144                return;
1145            }
1146    
1147            // Add the message.
1148            int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1149            long id = sd.orderIndex.getNextMessageId(priority);
1150            Long previous = sd.locationIndex.put(tx, location, id);
1151            if (previous == null) {
1152                previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1153                if (previous == null) {
1154                    sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1155                    if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1156                        addAckLocationForNewMessage(tx, sd, id);
1157                    }
1158                } else {
1159                    // If the message ID as indexed, then the broker asked us to
1160                    // store a DUP
1161                    // message. Bad BOY! Don't do it, and log a warning.
1162                    LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
1163                    sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1164                    sd.locationIndex.remove(tx, location);
1165                    rollbackStatsOnDuplicate(command.getDestination());
1166                }
1167            } else {
1168                // restore the previous value.. Looks like this was a redo of a
1169                // previously
1170                // added message. We don't want to assign it a new id as the other
1171                // indexes would
1172                // be wrong..
1173                //
1174                sd.locationIndex.put(tx, location, previous);
1175            }
1176            // record this id in any event, initial send or recovery
1177            metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1178            metadata.lastUpdate = location;
1179        }
1180    
1181        abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
1182    
1183        void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1184            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1185            if (!command.hasSubscriptionKey()) {
1186    
1187                // In the queue case we just remove the message from the index..
1188                Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1189                if (sequenceId != null) {
1190                    MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1191                    if (keys != null) {
1192                        sd.locationIndex.remove(tx, keys.location);
1193                        recordAckMessageReferenceLocation(ackLocation, keys.location);
1194                    }  else if (LOG.isDebugEnabled()) {
1195                        LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1196                    }
1197                } else if (LOG.isDebugEnabled()) {
1198                    LOG.debug("message not found in sequence id index: " + command.getMessageId());
1199                }
1200            } else {
1201                // In the topic case we need remove the message once it's been acked
1202                // by all the subs
1203                Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1204    
1205                // Make sure it's a valid message id...
1206                if (sequence != null) {
1207                    String subscriptionKey = command.getSubscriptionKey();
1208                    if (command.getAck() != UNMATCHED) {
1209                        sd.orderIndex.get(tx, sequence);
1210                        byte priority = sd.orderIndex.lastGetPriority();
1211                        sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1212                    }
1213                    // The following method handles deleting un-referenced messages.
1214                    removeAckLocation(tx, sd, subscriptionKey, sequence);
1215                } else if (LOG.isDebugEnabled()) {
1216                    LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1217                }
1218    
1219            }
1220            metadata.lastUpdate = ackLocation;
1221        }
1222    
1223        Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
1224        private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1225            Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1226            if (referenceFileIds == null) {
1227                referenceFileIds = new HashSet<Integer>();
1228                referenceFileIds.add(messageLocation.getDataFileId());
1229                ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1230            } else {
1231                Integer id = Integer.valueOf(messageLocation.getDataFileId());
1232                if (!referenceFileIds.contains(id)) {
1233                    referenceFileIds.add(id);
1234                }
1235            }
1236        }
1237    
1238        void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1239            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1240            sd.orderIndex.remove(tx);
1241    
1242            sd.locationIndex.clear(tx);
1243            sd.locationIndex.unload(tx);
1244            tx.free(sd.locationIndex.getPageId());
1245    
1246            sd.messageIdIndex.clear(tx);
1247            sd.messageIdIndex.unload(tx);
1248            tx.free(sd.messageIdIndex.getPageId());
1249    
1250            if (sd.subscriptions != null) {
1251                sd.subscriptions.clear(tx);
1252                sd.subscriptions.unload(tx);
1253                tx.free(sd.subscriptions.getPageId());
1254    
1255                sd.subscriptionAcks.clear(tx);
1256                sd.subscriptionAcks.unload(tx);
1257                tx.free(sd.subscriptionAcks.getPageId());
1258    
1259                sd.ackPositions.clear(tx);
1260                sd.ackPositions.unload(tx);
1261                tx.free(sd.ackPositions.getHeadPageId());
1262            }
1263    
1264            String key = key(command.getDestination());
1265            storedDestinations.remove(key);
1266            metadata.destinations.remove(tx, key);
1267        }
1268    
1269        void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1270            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1271            final String subscriptionKey = command.getSubscriptionKey();
1272    
1273            // If set then we are creating it.. otherwise we are destroying the sub
1274            if (command.hasSubscriptionInfo()) {
1275                sd.subscriptions.put(tx, subscriptionKey, command);
1276                long ackLocation=NOT_ACKED;
1277                if (!command.getRetroactive()) {
1278                    ackLocation = sd.orderIndex.nextMessageId-1;
1279                } else {
1280                    addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1281                }
1282                sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1283                sd.subscriptionCache.add(subscriptionKey);
1284            } else {
1285                // delete the sub...
1286                sd.subscriptions.remove(tx, subscriptionKey);
1287                sd.subscriptionAcks.remove(tx, subscriptionKey);
1288                sd.subscriptionCache.remove(subscriptionKey);
1289                removeAckLocationsForSub(tx, sd, subscriptionKey);
1290    
1291                if (sd.subscriptions.isEmpty(tx)) {
1292                    sd.messageIdIndex.clear(tx);
1293                    sd.locationIndex.clear(tx);
1294                    sd.orderIndex.clear(tx);
1295                }
1296            }
1297        }
1298    
1299        /**
1300         * @param tx
1301         * @throws IOException
1302         */
1303        void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1304            LOG.debug("Checkpoint started.");
1305    
1306            // reflect last update exclusive of current checkpoint
1307            Location firstTxLocation = metadata.lastUpdate;
1308    
1309            metadata.state = OPEN_STATE;
1310            metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1311            metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
1312            tx.store(metadata.page, metadataMarshaller, true);
1313            pageFile.flush();
1314    
1315            if( cleanup ) {
1316    
1317                final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1318                final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1319    
1320                if (LOG.isTraceEnabled()) {
1321                    LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
1322                }
1323    
1324                // Don't GC files under replication
1325                if( journalFilesBeingReplicated!=null ) {
1326                    gcCandidateSet.removeAll(journalFilesBeingReplicated);
1327                }
1328    
1329                if (metadata.producerSequenceIdTrackerLocation != null) {
1330                    gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId());
1331                }
1332    
1333                // Don't GC files after the first in progress tx
1334                if( metadata.firstInProgressTransactionLocation!=null ) {
1335                    if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
1336                        firstTxLocation = metadata.firstInProgressTransactionLocation;
1337                    }
1338                }
1339    
1340                if( firstTxLocation!=null ) {
1341                    while( !gcCandidateSet.isEmpty() ) {
1342                        Integer last = gcCandidateSet.last();
1343                        if( last >= firstTxLocation.getDataFileId() ) {
1344                            gcCandidateSet.remove(last);
1345                        } else {
1346                            break;
1347                        }
1348                    }
1349                    if (LOG.isTraceEnabled()) {
1350                        LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
1351                    }
1352                }
1353    
1354                // Go through all the destinations to see if any of them can remove GC candidates.
1355                for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1356                    if( gcCandidateSet.isEmpty() ) {
1357                        break;
1358                    }
1359    
1360                    // Use a visitor to cut down the number of pages that we load
1361                    entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1362                        int last=-1;
1363                        public boolean isInterestedInKeysBetween(Location first, Location second) {
1364                            if( first==null ) {
1365                                SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1366                                if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1367                                    subset.remove(second.getDataFileId());
1368                                }
1369                                return !subset.isEmpty();
1370                            } else if( second==null ) {
1371                                SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1372                                if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1373                                    subset.remove(first.getDataFileId());
1374                                }
1375                                return !subset.isEmpty();
1376                            } else {
1377                                SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1378                                if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1379                                    subset.remove(first.getDataFileId());
1380                                }
1381                                if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1382                                    subset.remove(second.getDataFileId());
1383                                }
1384                                return !subset.isEmpty();
1385                            }
1386                        }
1387    
1388                        public void visit(List<Location> keys, List<Long> values) {
1389                            for (Location l : keys) {
1390                                int fileId = l.getDataFileId();
1391                                if( last != fileId ) {
1392                                    gcCandidateSet.remove(fileId);
1393                                    last = fileId;
1394                                }
1395                            }
1396                        }
1397                    });
1398                    if (LOG.isTraceEnabled()) {
1399                        LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1400                    }
1401                }
1402    
1403                // check we are not deleting file with ack for in-use journal files
1404                if (LOG.isTraceEnabled()) {
1405                    LOG.trace("gc candidates: " + gcCandidateSet);
1406                }
1407                final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
1408                Iterator<Integer> candidates = gcCandidateSet.iterator();
1409                while (candidates.hasNext()) {
1410                    Integer candidate = candidates.next();
1411                    Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
1412                    if (referencedFileIds != null) {
1413                        for (Integer referencedFileId : referencedFileIds) {
1414                            if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
1415                                // active file that is not targeted for deletion is referenced so don't delete
1416                                candidates.remove();
1417                                break;
1418                            }
1419                        }
1420                        if (gcCandidateSet.contains(candidate)) {
1421                            ackMessageFileMap.remove(candidate);
1422                        } else {
1423                            if (LOG.isTraceEnabled()) {
1424                                LOG.trace("not removing data file: " + candidate
1425                                        + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1426                            }
1427                        }
1428                    }
1429                }
1430    
1431                if (!gcCandidateSet.isEmpty()) {
1432                    if (LOG.isDebugEnabled()) {
1433                        LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
1434                    }
1435                    journal.removeDataFiles(gcCandidateSet);
1436                }
1437            }
1438    
1439            LOG.debug("Checkpoint done.");
1440        }
1441    
1442        final Runnable nullCompletionCallback = new Runnable() {
1443            @Override
1444            public void run() {
1445            }
1446        };
1447        private Location checkpointProducerAudit() throws IOException {
1448            ByteArrayOutputStream baos = new ByteArrayOutputStream();
1449            ObjectOutputStream oout = new ObjectOutputStream(baos);
1450            oout.writeObject(metadata.producerSequenceIdTracker);
1451            oout.flush();
1452            oout.close();
1453            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1454            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
1455            try {
1456                location.getLatch().await();
1457            } catch (InterruptedException e) {
1458                throw new InterruptedIOException(e.toString());
1459            }
1460            return location;
1461        }
1462    
1463        public HashSet<Integer> getJournalFilesBeingReplicated() {
1464            return journalFilesBeingReplicated;
1465        }
1466    
1467        // /////////////////////////////////////////////////////////////////
1468        // StoredDestination related implementation methods.
1469        // /////////////////////////////////////////////////////////////////
1470    
1471        private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1472    
1473        class StoredSubscription {
1474            SubscriptionInfo subscriptionInfo;
1475            String lastAckId;
1476            Location lastAckLocation;
1477            Location cursor;
1478        }
1479    
1480        static class MessageKeys {
1481            final String messageId;
1482            final Location location;
1483    
1484            public MessageKeys(String messageId, Location location) {
1485                this.messageId=messageId;
1486                this.location=location;
1487            }
1488    
1489            @Override
1490            public String toString() {
1491                return "["+messageId+","+location+"]";
1492            }
1493        }
1494    
1495        static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1496            static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1497    
1498            public MessageKeys readPayload(DataInput dataIn) throws IOException {
1499                return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1500            }
1501    
1502            public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1503                dataOut.writeUTF(object.messageId);
1504                LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1505            }
1506        }
1507    
1508        class LastAck {
1509            long lastAckedSequence;
1510            byte priority;
1511    
1512            public LastAck(LastAck source) {
1513                this.lastAckedSequence = source.lastAckedSequence;
1514                this.priority = source.priority;
1515            }
1516    
1517            public LastAck() {
1518                this.priority = MessageOrderIndex.HI;
1519            }
1520    
1521            public LastAck(long ackLocation) {
1522                this.lastAckedSequence = ackLocation;
1523                this.priority = MessageOrderIndex.LO;
1524            }
1525    
1526            public LastAck(long ackLocation, byte priority) {
1527                this.lastAckedSequence = ackLocation;
1528                this.priority = priority;
1529            }
1530    
1531            public String toString() {
1532                return "[" + lastAckedSequence + ":" + priority + "]";
1533            }
1534        }
1535    
1536        protected class LastAckMarshaller implements Marshaller<LastAck> {
1537    
1538            public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1539                dataOut.writeLong(object.lastAckedSequence);
1540                dataOut.writeByte(object.priority);
1541            }
1542    
1543            public LastAck readPayload(DataInput dataIn) throws IOException {
1544                LastAck lastAcked = new LastAck();
1545                lastAcked.lastAckedSequence = dataIn.readLong();
1546                if (metadata.version >= 3) {
1547                    lastAcked.priority = dataIn.readByte();
1548                }
1549                return lastAcked;
1550            }
1551    
1552            public int getFixedSize() {
1553                return 9;
1554            }
1555    
1556            public LastAck deepCopy(LastAck source) {
1557                return new LastAck(source);
1558            }
1559    
1560            public boolean isDeepCopySupported() {
1561                return true;
1562            }
1563        }
1564    
1565        class StoredDestination {
1566    
1567            MessageOrderIndex orderIndex = new MessageOrderIndex();
1568            BTreeIndex<Location, Long> locationIndex;
1569            BTreeIndex<String, Long> messageIdIndex;
1570    
1571            // These bits are only set for Topics
1572            BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1573            BTreeIndex<String, LastAck> subscriptionAcks;
1574            HashMap<String, MessageOrderCursor> subscriptionCursors;
1575            ListIndex<String, SequenceSet> ackPositions;
1576    
1577            // Transient data used to track which Messages are no longer needed.
1578            final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
1579            final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
1580        }
1581    
1582        protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1583    
1584            public StoredDestination readPayload(final DataInput dataIn) throws IOException {
1585                final StoredDestination value = new StoredDestination();
1586                value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1587                value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1588                value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1589    
1590                if (dataIn.readBoolean()) {
1591                    value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1592                    value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1593                    if (metadata.version >= 4) {
1594                        value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
1595                    } else {
1596                        // upgrade
1597                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
1598                            public void execute(Transaction tx) throws IOException {
1599                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
1600                                    new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1601                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1602                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1603                                oldAckPositions.load(tx);
1604    
1605                                LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
1606    
1607                                // Do the initial build of the data in memory before writing into the store
1608                                // based Ack Positions List to avoid a lot of disk thrashing.
1609                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
1610                                while (iterator.hasNext()) {
1611                                    Entry<Long, HashSet<String>> entry = iterator.next();
1612    
1613                                    for(String subKey : entry.getValue()) {
1614                                        SequenceSet pendingAcks = temp.get(subKey);
1615                                        if (pendingAcks == null) {
1616                                            pendingAcks = new SequenceSet();
1617                                            temp.put(subKey, pendingAcks);
1618                                        }
1619    
1620                                        pendingAcks.add(entry.getKey());
1621                                    }
1622                                }
1623    
1624                                // Now move the pending messages to ack data into the store backed
1625                                // structure.
1626                                value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1627                                for(String subscriptionKey : temp.keySet()) {
1628                                    value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
1629                                }
1630    
1631                            }
1632                        });
1633                    }
1634                }
1635                if (metadata.version >= 2) {
1636                    value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1637                    value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1638                } else {
1639                        // upgrade
1640                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
1641                            public void execute(Transaction tx) throws IOException {
1642                                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1643                                value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1644                                value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1645                                value.orderIndex.lowPriorityIndex.load(tx);
1646    
1647                                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1648                                value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1649                                value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1650                                value.orderIndex.highPriorityIndex.load(tx);
1651                            }
1652                        });
1653                }
1654    
1655                return value;
1656            }
1657    
1658            public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1659                dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
1660                dataOut.writeLong(value.locationIndex.getPageId());
1661                dataOut.writeLong(value.messageIdIndex.getPageId());
1662                if (value.subscriptions != null) {
1663                    dataOut.writeBoolean(true);
1664                    dataOut.writeLong(value.subscriptions.getPageId());
1665                    dataOut.writeLong(value.subscriptionAcks.getPageId());
1666                    dataOut.writeLong(value.ackPositions.getHeadPageId());
1667                } else {
1668                    dataOut.writeBoolean(false);
1669                }
1670                dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
1671                dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
1672            }
1673        }
1674    
1675        static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1676            final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1677    
1678            public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1679                KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1680                rc.mergeFramed((InputStream)dataIn);
1681                return rc;
1682            }
1683    
1684            public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1685                object.writeFramed((OutputStream)dataOut);
1686            }
1687        }
1688    
1689        protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1690            String key = key(destination);
1691            StoredDestination rc = storedDestinations.get(key);
1692            if (rc == null) {
1693                boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1694                rc = loadStoredDestination(tx, key, topic);
1695                // Cache it. We may want to remove/unload destinations from the
1696                // cache that are not used for a while
1697                // to reduce memory usage.
1698                storedDestinations.put(key, rc);
1699            }
1700            return rc;
1701        }
1702    
1703        protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1704            String key = key(destination);
1705            StoredDestination rc = storedDestinations.get(key);
1706            if (rc == null && metadata.destinations.containsKey(tx, key)) {
1707                rc = getStoredDestination(destination, tx);
1708            }
1709            return rc;
1710        }
1711    
1712        /**
1713         * @param tx
1714         * @param key
1715         * @param topic
1716         * @return
1717         * @throws IOException
1718         */
1719        private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1720            // Try to load the existing indexes..
1721            StoredDestination rc = metadata.destinations.get(tx, key);
1722            if (rc == null) {
1723                // Brand new destination.. allocate indexes for it.
1724                rc = new StoredDestination();
1725                rc.orderIndex.allocate(tx);
1726                rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1727                rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1728    
1729                if (topic) {
1730                    rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1731                    rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
1732                    rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1733                }
1734                metadata.destinations.put(tx, key, rc);
1735            }
1736    
1737            // Configure the marshalers and load.
1738            rc.orderIndex.load(tx);
1739    
1740            // Figure out the next key using the last entry in the destination.
1741            rc.orderIndex.configureLast(tx);
1742    
1743            rc.locationIndex.setKeyMarshaller(org.apache.kahadb.util.LocationMarshaller.INSTANCE);
1744            rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1745            rc.locationIndex.load(tx);
1746    
1747            rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1748            rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1749            rc.messageIdIndex.load(tx);
1750    
1751            // If it was a topic...
1752            if (topic) {
1753    
1754                rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1755                rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1756                rc.subscriptions.load(tx);
1757    
1758                rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1759                rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
1760                rc.subscriptionAcks.load(tx);
1761    
1762                rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
1763                rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
1764                rc.ackPositions.load(tx);
1765    
1766                rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
1767    
1768                if (metadata.version < 3) {
1769    
1770                    // on upgrade need to fill ackLocation with available messages past last ack
1771                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1772                        Entry<String, LastAck> entry = iterator.next();
1773                        for (Iterator<Entry<Long, MessageKeys>> orderIterator =
1774                                rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
1775                            Long sequence = orderIterator.next().getKey();
1776                            addAckLocation(tx, rc, sequence, entry.getKey());
1777                        }
1778                        // modify so it is upgraded
1779                        rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
1780                    }
1781                }
1782    
1783                // Configure the message references index
1784                Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
1785                while (subscriptions.hasNext()) {
1786                    Entry<String, SequenceSet> subscription = subscriptions.next();
1787                    SequenceSet pendingAcks = subscription.getValue();
1788                    if (pendingAcks != null && !pendingAcks.isEmpty()) {
1789                        Long lastPendingAck = pendingAcks.getTail().getLast();
1790                        for(Long sequenceId : pendingAcks) {
1791                            Long current = rc.messageReferences.get(sequenceId);
1792                            if (current == null) {
1793                                current = new Long(0);
1794                            }
1795    
1796                            // We always add a trailing empty entry for the next position to start from
1797                            // so we need to ensure we don't count that as a message reference on reload.
1798                            if (!sequenceId.equals(lastPendingAck)) {
1799                                current = current.longValue() + 1;
1800                            }
1801    
1802                            rc.messageReferences.put(sequenceId, current);
1803                        }
1804                    }
1805                }
1806    
1807                // Configure the subscription cache
1808                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1809                    Entry<String, LastAck> entry = iterator.next();
1810                    rc.subscriptionCache.add(entry.getKey());
1811                }
1812    
1813                if (rc.orderIndex.nextMessageId == 0) {
1814                    // check for existing durable sub all acked out - pull next seq from acks as messages are gone
1815                    if (!rc.subscriptionAcks.isEmpty(tx)) {
1816                        for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1817                            Entry<String, LastAck> entry = iterator.next();
1818                            rc.orderIndex.nextMessageId =
1819                                    Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
1820                        }
1821                    }
1822                } else {
1823                    // update based on ackPositions for unmatched, last entry is always the next
1824                    if (!rc.messageReferences.isEmpty()) {
1825                        Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
1826                        rc.orderIndex.nextMessageId =
1827                                Math.max(rc.orderIndex.nextMessageId, nextMessageId);
1828                    }
1829                }
1830            }
1831    
1832            if (metadata.version < VERSION) {
1833                // store again after upgrade
1834                metadata.destinations.put(tx, key, rc);
1835            }
1836            return rc;
1837        }
1838    
1839        private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1840            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1841            if (sequences == null) {
1842                sequences = new SequenceSet();
1843                sequences.add(messageSequence);
1844                sd.ackPositions.add(tx, subscriptionKey, sequences);
1845            } else {
1846                sequences.add(messageSequence);
1847                sd.ackPositions.put(tx, subscriptionKey, sequences);
1848            }
1849    
1850            Long count = sd.messageReferences.get(messageSequence);
1851            if (count == null) {
1852                count = Long.valueOf(0L);
1853            }
1854            count = count.longValue() + 1;
1855            sd.messageReferences.put(messageSequence, count);
1856        }
1857    
1858        // new sub is interested in potentially all existing messages
1859        private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1860            SequenceSet allOutstanding = new SequenceSet();
1861            Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
1862            while (iterator.hasNext()) {
1863                SequenceSet set = iterator.next().getValue();
1864                for (Long entry : set) {
1865                    allOutstanding.add(entry);
1866                }
1867            }
1868            sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
1869    
1870            for (Long ackPosition : allOutstanding) {
1871                Long count = sd.messageReferences.get(ackPosition);
1872                count = count.longValue() + 1;
1873                sd.messageReferences.put(ackPosition, count);
1874            }
1875        }
1876    
1877        // on a new message add, all existing subs are interested in this message
1878        private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
1879            for(String subscriptionKey : sd.subscriptionCache) {
1880                SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1881                if (sequences == null) {
1882                    sequences = new SequenceSet();
1883                    sequences.add(new Sequence(messageSequence, messageSequence + 1));
1884                    sd.ackPositions.add(tx, subscriptionKey, sequences);
1885                } else {
1886                    sequences.add(new Sequence(messageSequence, messageSequence + 1));
1887                    sd.ackPositions.put(tx, subscriptionKey, sequences);
1888                }
1889    
1890                Long count = sd.messageReferences.get(messageSequence);
1891                if (count == null) {
1892                    count = Long.valueOf(0L);
1893                }
1894                count = count.longValue() + 1;
1895                sd.messageReferences.put(messageSequence, count);
1896                sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
1897            }
1898        }
1899    
1900        private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1901            if (!sd.ackPositions.isEmpty(tx)) {
1902                SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
1903                if (sequences == null || sequences.isEmpty()) {
1904                    return;
1905                }
1906    
1907                ArrayList<Long> unreferenced = new ArrayList<Long>();
1908    
1909                for(Long sequenceId : sequences) {
1910                    Long references = sd.messageReferences.get(sequenceId);
1911                    if (references != null) {
1912                        references = references.longValue() - 1;
1913    
1914                        if (references.longValue() > 0) {
1915                            sd.messageReferences.put(sequenceId, references);
1916                        } else {
1917                            sd.messageReferences.remove(sequenceId);
1918                            unreferenced.add(sequenceId);
1919                        }
1920                    }
1921                }
1922    
1923                for(Long sequenceId : unreferenced) {
1924                    // Find all the entries that need to get deleted.
1925                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1926                    sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
1927    
1928                    // Do the actual deletes.
1929                    for (Entry<Long, MessageKeys> entry : deletes) {
1930                        sd.locationIndex.remove(tx, entry.getValue().location);
1931                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
1932                        sd.orderIndex.remove(tx, entry.getKey());
1933                    }
1934                }
1935            }
1936        }
1937    
1938        /**
1939         * @param tx
1940         * @param sd
1941         * @param subscriptionKey
1942         * @param messageSequence
1943         * @throws IOException
1944         */
1945        private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
1946            // Remove the sub from the previous location set..
1947            if (messageSequence != null) {
1948                SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
1949                if (range != null && !range.isEmpty()) {
1950                    range.remove(messageSequence);
1951                    if (!range.isEmpty()) {
1952                        sd.ackPositions.put(tx, subscriptionKey, range);
1953                    } else {
1954                        sd.ackPositions.remove(tx, subscriptionKey);
1955                    }
1956    
1957                    // Check if the message is reference by any other subscription.
1958                    Long count = sd.messageReferences.get(messageSequence);
1959                    if (count != null){
1960                    long references = count.longValue() - 1;
1961                        if (references > 0) {
1962                            sd.messageReferences.put(messageSequence, Long.valueOf(references));
1963                            return;
1964                        } else {
1965                            sd.messageReferences.remove(messageSequence);
1966                        }
1967                    }
1968    
1969                    // Find all the entries that need to get deleted.
1970                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1971                    sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
1972    
1973                    // Do the actual deletes.
1974                    for (Entry<Long, MessageKeys> entry : deletes) {
1975                        sd.locationIndex.remove(tx, entry.getValue().location);
1976                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
1977                        sd.orderIndex.remove(tx, entry.getKey());
1978                    }
1979                }
1980            }
1981        }
1982    
1983        public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1984            return sd.subscriptionAcks.get(tx, subscriptionKey);
1985        }
1986    
1987        public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1988            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
1989            if (messageSequences != null) {
1990                long result = messageSequences.rangeSize();
1991                // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
1992                return result > 0 ? result - 1 : 0;
1993            }
1994    
1995            return 0;
1996        }
1997    
1998        private String key(KahaDestination destination) {
1999            return destination.getType().getNumber() + ":" + destination.getName();
2000        }
2001    
2002        // /////////////////////////////////////////////////////////////////
2003        // Transaction related implementation methods.
2004        // /////////////////////////////////////////////////////////////////
2005        @SuppressWarnings("rawtypes")
2006        private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2007        @SuppressWarnings("rawtypes")
2008        protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2009        protected final Set<String> ackedAndPrepared = new HashSet<String>();
2010    
2011        // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2012        // till then they are skipped by the store.
2013        // 'at most once' XA guarantee
2014        public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2015            this.indexLock.writeLock().lock();
2016            try {
2017                for (MessageAck ack : acks) {
2018                    ackedAndPrepared.add(ack.getLastMessageId().toString());
2019                }
2020            } finally {
2021                this.indexLock.writeLock().unlock();
2022            }
2023        }
2024    
2025        public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
2026            if (acks != null) {
2027                this.indexLock.writeLock().lock();
2028                try {
2029                    for (MessageAck ack : acks) {
2030                        ackedAndPrepared.remove(ack.getLastMessageId().toString());
2031                    }
2032                } finally {
2033                    this.indexLock.writeLock().unlock();
2034                }
2035            }
2036        }
2037    
2038        @SuppressWarnings("rawtypes")
2039        private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
2040            TransactionId key = TransactionIdConversion.convert(info);
2041            List<Operation> tx;
2042            synchronized (inflightTransactions) {
2043                tx = inflightTransactions.get(key);
2044                if (tx == null) {
2045                    tx = Collections.synchronizedList(new ArrayList<Operation>());
2046                    inflightTransactions.put(key, tx);
2047                }
2048            }
2049            return tx;
2050        }
2051    
2052        @SuppressWarnings("unused")
2053        private TransactionId key(KahaTransactionInfo transactionInfo) {
2054            return TransactionIdConversion.convert(transactionInfo);
2055        }
2056    
2057        abstract class Operation <T extends JournalCommand<T>> {
2058            final T command;
2059            final Location location;
2060    
2061            public Operation(T command, Location location) {
2062                this.command = command;
2063                this.location = location;
2064            }
2065    
2066            public Location getLocation() {
2067                return location;
2068            }
2069    
2070            public T getCommand() {
2071                return command;
2072            }
2073    
2074            abstract public void execute(Transaction tx) throws IOException;
2075        }
2076    
2077        class AddOpperation extends Operation<KahaAddMessageCommand> {
2078    
2079            public AddOpperation(KahaAddMessageCommand command, Location location) {
2080                super(command, location);
2081            }
2082    
2083            @Override
2084            public void execute(Transaction tx) throws IOException {
2085                upadateIndex(tx, command, location);
2086            }
2087    
2088        }
2089    
2090        class RemoveOpperation extends Operation<KahaRemoveMessageCommand> {
2091    
2092            public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
2093                super(command, location);
2094            }
2095    
2096            @Override
2097            public void execute(Transaction tx) throws IOException {
2098                updateIndex(tx, command, location);
2099            }
2100        }
2101    
2102        // /////////////////////////////////////////////////////////////////
2103        // Initialization related implementation methods.
2104        // /////////////////////////////////////////////////////////////////
2105    
2106        private PageFile createPageFile() {
2107            PageFile index = new PageFile(directory, "db");
2108            index.setEnableWriteThread(isEnableIndexWriteAsync());
2109            index.setWriteBatchSize(getIndexWriteBatchSize());
2110            index.setPageCacheSize(indexCacheSize);
2111            index.setUseLFRUEviction(isUseIndexLFRUEviction());
2112            index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2113            index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2114            index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2115            index.setEnablePageCaching(isEnableIndexPageCaching());
2116            return index;
2117        }
2118    
2119        private Journal createJournal() throws IOException {
2120            Journal manager = new Journal();
2121            manager.setDirectory(directory);
2122            manager.setMaxFileLength(getJournalMaxFileLength());
2123            manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2124            manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2125            manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2126            manager.setArchiveDataLogs(isArchiveDataLogs());
2127            manager.setSizeAccumulator(journalSize);
2128            manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2129            if (getDirectoryArchive() != null) {
2130                IOHelper.mkdirs(getDirectoryArchive());
2131                manager.setDirectoryArchive(getDirectoryArchive());
2132            }
2133            return manager;
2134        }
2135    
2136        public int getJournalMaxWriteBatchSize() {
2137            return journalMaxWriteBatchSize;
2138        }
2139    
2140        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2141            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2142        }
2143    
2144        public File getDirectory() {
2145            return directory;
2146        }
2147    
2148        public void setDirectory(File directory) {
2149            this.directory = directory;
2150        }
2151    
2152        public boolean isDeleteAllMessages() {
2153            return deleteAllMessages;
2154        }
2155    
2156        public void setDeleteAllMessages(boolean deleteAllMessages) {
2157            this.deleteAllMessages = deleteAllMessages;
2158        }
2159    
2160        public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2161            this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2162        }
2163    
2164        public int getIndexWriteBatchSize() {
2165            return setIndexWriteBatchSize;
2166        }
2167    
2168        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2169            this.enableIndexWriteAsync = enableIndexWriteAsync;
2170        }
2171    
2172        boolean isEnableIndexWriteAsync() {
2173            return enableIndexWriteAsync;
2174        }
2175    
2176        public boolean isEnableJournalDiskSyncs() {
2177            return enableJournalDiskSyncs;
2178        }
2179    
2180        public void setEnableJournalDiskSyncs(boolean syncWrites) {
2181            this.enableJournalDiskSyncs = syncWrites;
2182        }
2183    
2184        public long getCheckpointInterval() {
2185            return checkpointInterval;
2186        }
2187    
2188        public void setCheckpointInterval(long checkpointInterval) {
2189            this.checkpointInterval = checkpointInterval;
2190        }
2191    
2192        public long getCleanupInterval() {
2193            return cleanupInterval;
2194        }
2195    
2196        public void setCleanupInterval(long cleanupInterval) {
2197            this.cleanupInterval = cleanupInterval;
2198        }
2199    
2200        public void setJournalMaxFileLength(int journalMaxFileLength) {
2201            this.journalMaxFileLength = journalMaxFileLength;
2202        }
2203    
2204        public int getJournalMaxFileLength() {
2205            return journalMaxFileLength;
2206        }
2207    
2208        public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
2209            this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
2210        }
2211    
2212        public int getMaxFailoverProducersToTrack() {
2213            return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
2214        }
2215    
2216        public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
2217            this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
2218        }
2219    
2220        public int getFailoverProducersAuditDepth() {
2221            return this.metadata.producerSequenceIdTracker.getAuditDepth();
2222        }
2223    
2224        public PageFile getPageFile() {
2225            if (pageFile == null) {
2226                pageFile = createPageFile();
2227            }
2228            return pageFile;
2229        }
2230    
2231        public Journal getJournal() throws IOException {
2232            if (journal == null) {
2233                journal = createJournal();
2234            }
2235            return journal;
2236        }
2237    
2238        public boolean isFailIfDatabaseIsLocked() {
2239            return failIfDatabaseIsLocked;
2240        }
2241    
2242        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
2243            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
2244        }
2245    
2246        public boolean isIgnoreMissingJournalfiles() {
2247            return ignoreMissingJournalfiles;
2248        }
2249    
2250        public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
2251            this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
2252        }
2253    
2254        public int getIndexCacheSize() {
2255            return indexCacheSize;
2256        }
2257    
2258        public void setIndexCacheSize(int indexCacheSize) {
2259            this.indexCacheSize = indexCacheSize;
2260        }
2261    
2262        public boolean isCheckForCorruptJournalFiles() {
2263            return checkForCorruptJournalFiles;
2264        }
2265    
2266        public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
2267            this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
2268        }
2269    
2270        public boolean isChecksumJournalFiles() {
2271            return checksumJournalFiles;
2272        }
2273    
2274        public void setChecksumJournalFiles(boolean checksumJournalFiles) {
2275            this.checksumJournalFiles = checksumJournalFiles;
2276        }
2277    
2278        public void setBrokerService(BrokerService brokerService) {
2279            this.brokerService = brokerService;
2280        }
2281    
2282        /**
2283         * @return the archiveDataLogs
2284         */
2285        public boolean isArchiveDataLogs() {
2286            return this.archiveDataLogs;
2287        }
2288    
2289        /**
2290         * @param archiveDataLogs the archiveDataLogs to set
2291         */
2292        public void setArchiveDataLogs(boolean archiveDataLogs) {
2293            this.archiveDataLogs = archiveDataLogs;
2294        }
2295    
2296        /**
2297         * @return the directoryArchive
2298         */
2299        public File getDirectoryArchive() {
2300            return this.directoryArchive;
2301        }
2302    
2303        /**
2304         * @param directoryArchive the directoryArchive to set
2305         */
2306        public void setDirectoryArchive(File directoryArchive) {
2307            this.directoryArchive = directoryArchive;
2308        }
2309    
2310        public boolean isRewriteOnRedelivery() {
2311            return rewriteOnRedelivery;
2312        }
2313    
2314        public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
2315            this.rewriteOnRedelivery = rewriteOnRedelivery;
2316        }
2317    
2318        public boolean isArchiveCorruptedIndex() {
2319            return archiveCorruptedIndex;
2320        }
2321    
2322        public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
2323            this.archiveCorruptedIndex = archiveCorruptedIndex;
2324        }
2325    
2326        public float getIndexLFUEvictionFactor() {
2327            return indexLFUEvictionFactor;
2328        }
2329    
2330        public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
2331            this.indexLFUEvictionFactor = indexLFUEvictionFactor;
2332        }
2333    
2334        public boolean isUseIndexLFRUEviction() {
2335            return useIndexLFRUEviction;
2336        }
2337    
2338        public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
2339            this.useIndexLFRUEviction = useIndexLFRUEviction;
2340        }
2341    
2342        public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
2343            this.enableIndexDiskSyncs = enableIndexDiskSyncs;
2344        }
2345    
2346        public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
2347            this.enableIndexRecoveryFile = enableIndexRecoveryFile;
2348        }
2349    
2350        public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
2351            this.enableIndexPageCaching = enableIndexPageCaching;
2352        }
2353    
2354        public boolean isEnableIndexDiskSyncs() {
2355            return enableIndexDiskSyncs;
2356        }
2357    
2358        public boolean isEnableIndexRecoveryFile() {
2359            return enableIndexRecoveryFile;
2360        }
2361    
2362        public boolean isEnableIndexPageCaching() {
2363            return enableIndexPageCaching;
2364        }
2365    
2366        // /////////////////////////////////////////////////////////////////
2367        // Internal conversion methods.
2368        // /////////////////////////////////////////////////////////////////
2369    
2370        class MessageOrderCursor{
2371            long defaultCursorPosition;
2372            long lowPriorityCursorPosition;
2373            long highPriorityCursorPosition;
2374            MessageOrderCursor(){
2375            }
2376    
2377            MessageOrderCursor(long position){
2378                this.defaultCursorPosition=position;
2379                this.lowPriorityCursorPosition=position;
2380                this.highPriorityCursorPosition=position;
2381            }
2382    
2383            MessageOrderCursor(MessageOrderCursor other){
2384                this.defaultCursorPosition=other.defaultCursorPosition;
2385                this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2386                this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2387            }
2388    
2389            MessageOrderCursor copy() {
2390                return new MessageOrderCursor(this);
2391            }
2392    
2393            void reset() {
2394                this.defaultCursorPosition=0;
2395                this.highPriorityCursorPosition=0;
2396                this.lowPriorityCursorPosition=0;
2397            }
2398    
2399            void increment() {
2400                if (defaultCursorPosition!=0) {
2401                    defaultCursorPosition++;
2402                }
2403                if (highPriorityCursorPosition!=0) {
2404                    highPriorityCursorPosition++;
2405                }
2406                if (lowPriorityCursorPosition!=0) {
2407                    lowPriorityCursorPosition++;
2408                }
2409            }
2410    
2411            public String toString() {
2412               return "MessageOrderCursor:[def:" + defaultCursorPosition
2413                       + ", low:" + lowPriorityCursorPosition
2414                       + ", high:" +  highPriorityCursorPosition + "]";
2415            }
2416    
2417            public void sync(MessageOrderCursor other) {
2418                this.defaultCursorPosition=other.defaultCursorPosition;
2419                this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2420                this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2421            }
2422        }
2423    
2424        class MessageOrderIndex {
2425            static final byte HI = 9;
2426            static final byte LO = 0;
2427            static final byte DEF = 4;
2428    
2429            long nextMessageId;
2430            BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2431            BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2432            BTreeIndex<Long, MessageKeys> highPriorityIndex;
2433            MessageOrderCursor cursor = new MessageOrderCursor();
2434            Long lastDefaultKey;
2435            Long lastHighKey;
2436            Long lastLowKey;
2437            byte lastGetPriority;
2438    
2439            MessageKeys remove(Transaction tx, Long key) throws IOException {
2440                MessageKeys result = defaultPriorityIndex.remove(tx, key);
2441                if (result == null && highPriorityIndex!=null) {
2442                    result = highPriorityIndex.remove(tx, key);
2443                    if (result ==null && lowPriorityIndex!=null) {
2444                        result = lowPriorityIndex.remove(tx, key);
2445                    }
2446                }
2447                return result;
2448            }
2449    
2450            void load(Transaction tx) throws IOException {
2451                defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2452                defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2453                defaultPriorityIndex.load(tx);
2454                lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2455                lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2456                lowPriorityIndex.load(tx);
2457                highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2458                highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2459                highPriorityIndex.load(tx);
2460            }
2461    
2462            void allocate(Transaction tx) throws IOException {
2463                defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2464                if (metadata.version >= 2) {
2465                    lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2466                    highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2467                }
2468            }
2469    
2470            void configureLast(Transaction tx) throws IOException {
2471                // Figure out the next key using the last entry in the destination.
2472                if (highPriorityIndex != null) {
2473                    Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
2474                    if (lastEntry != null) {
2475                        nextMessageId = lastEntry.getKey() + 1;
2476                    } else {
2477                        lastEntry = defaultPriorityIndex.getLast(tx);
2478                        if (lastEntry != null) {
2479                            nextMessageId = lastEntry.getKey() + 1;
2480                        } else {
2481                            lastEntry = lowPriorityIndex.getLast(tx);
2482                            if (lastEntry != null) {
2483                                nextMessageId = lastEntry.getKey() + 1;
2484                            }
2485                        }
2486                    }
2487                } else {
2488                    Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
2489                    if (lastEntry != null) {
2490                        nextMessageId = lastEntry.getKey() + 1;
2491                    }
2492                }
2493            }
2494    
2495            void clear(Transaction tx) throws IOException {
2496                this.remove(tx);
2497                this.resetCursorPosition();
2498                this.allocate(tx);
2499                this.load(tx);
2500                this.configureLast(tx);
2501            }
2502    
2503            void remove(Transaction tx) throws IOException {
2504                defaultPriorityIndex.clear(tx);
2505                defaultPriorityIndex.unload(tx);
2506                tx.free(defaultPriorityIndex.getPageId());
2507                if (lowPriorityIndex != null) {
2508                    lowPriorityIndex.clear(tx);
2509                    lowPriorityIndex.unload(tx);
2510    
2511                    tx.free(lowPriorityIndex.getPageId());
2512                }
2513                if (highPriorityIndex != null) {
2514                    highPriorityIndex.clear(tx);
2515                    highPriorityIndex.unload(tx);
2516                    tx.free(highPriorityIndex.getPageId());
2517                }
2518            }
2519    
2520            void resetCursorPosition() {
2521                this.cursor.reset();
2522                lastDefaultKey = null;
2523                lastHighKey = null;
2524                lastLowKey = null;
2525            }
2526    
2527            void setBatch(Transaction tx, Long sequence) throws IOException {
2528                if (sequence != null) {
2529                    Long nextPosition = new Long(sequence.longValue() + 1);
2530                    if (defaultPriorityIndex.containsKey(tx, sequence)) {
2531                        lastDefaultKey = sequence;
2532                        cursor.defaultCursorPosition = nextPosition.longValue();
2533                    } else if (highPriorityIndex != null) {
2534                        if (highPriorityIndex.containsKey(tx, sequence)) {
2535                            lastHighKey = sequence;
2536                            cursor.highPriorityCursorPosition = nextPosition.longValue();
2537                        } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2538                            lastLowKey = sequence;
2539                            cursor.lowPriorityCursorPosition = nextPosition.longValue();
2540                        }
2541                    } else {
2542                        LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this);
2543                        lastDefaultKey = sequence;
2544                        cursor.defaultCursorPosition = nextPosition.longValue();
2545                    }
2546                }
2547            }
2548    
2549            void setBatch(Transaction tx, LastAck last) throws IOException {
2550                setBatch(tx, last.lastAckedSequence);
2551                if (cursor.defaultCursorPosition == 0
2552                        && cursor.highPriorityCursorPosition == 0
2553                        && cursor.lowPriorityCursorPosition == 0) {
2554                    long next = last.lastAckedSequence + 1;
2555                    switch (last.priority) {
2556                        case DEF:
2557                            cursor.defaultCursorPosition = next;
2558                            cursor.highPriorityCursorPosition = next;
2559                            break;
2560                        case HI:
2561                            cursor.highPriorityCursorPosition = next;
2562                            break;
2563                        case LO:
2564                            cursor.lowPriorityCursorPosition = next;
2565                            cursor.defaultCursorPosition = next;
2566                            cursor.highPriorityCursorPosition = next;
2567                            break;
2568                    }
2569                }
2570            }
2571    
2572            void stoppedIterating() {
2573                if (lastDefaultKey!=null) {
2574                    cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2575                }
2576                if (lastHighKey!=null) {
2577                    cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2578                }
2579                if (lastLowKey!=null) {
2580                    cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2581                }
2582                lastDefaultKey = null;
2583                lastHighKey = null;
2584                lastLowKey = null;
2585            }
2586    
2587            void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2588                    throws IOException {
2589                if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2590                    getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2591                } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2592                    getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2593                } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2594                    getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2595                }
2596            }
2597    
2598            void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2599                    BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2600    
2601                Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
2602                deletes.add(iterator.next());
2603            }
2604    
2605            long getNextMessageId(int priority) {
2606                return nextMessageId++;
2607            }
2608    
2609            MessageKeys get(Transaction tx, Long key) throws IOException {
2610                MessageKeys result = defaultPriorityIndex.get(tx, key);
2611                if (result == null) {
2612                    result = highPriorityIndex.get(tx, key);
2613                    if (result == null) {
2614                        result = lowPriorityIndex.get(tx, key);
2615                        lastGetPriority = LO;
2616                    } else {
2617                        lastGetPriority = HI;
2618                    }
2619                } else {
2620                    lastGetPriority = DEF;
2621                }
2622                return result;
2623            }
2624    
2625            MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2626                if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
2627                    return defaultPriorityIndex.put(tx, key, value);
2628                } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
2629                    return highPriorityIndex.put(tx, key, value);
2630                } else {
2631                    return lowPriorityIndex.put(tx, key, value);
2632                }
2633            }
2634    
2635            Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
2636                return new MessageOrderIterator(tx,cursor);
2637            }
2638    
2639            Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
2640                return new MessageOrderIterator(tx,m);
2641            }
2642    
2643            public byte lastGetPriority() {
2644                return lastGetPriority;
2645            }
2646    
2647            class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
2648                Iterator<Entry<Long, MessageKeys>>currentIterator;
2649                final Iterator<Entry<Long, MessageKeys>>highIterator;
2650                final Iterator<Entry<Long, MessageKeys>>defaultIterator;
2651                final Iterator<Entry<Long, MessageKeys>>lowIterator;
2652    
2653                MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
2654                    this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
2655                    if (highPriorityIndex != null) {
2656                        this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
2657                    } else {
2658                        this.highIterator = null;
2659                    }
2660                    if (lowPriorityIndex != null) {
2661                        this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
2662                    } else {
2663                        this.lowIterator = null;
2664                    }
2665                }
2666    
2667                public boolean hasNext() {
2668                    if (currentIterator == null) {
2669                        if (highIterator != null) {
2670                            if (highIterator.hasNext()) {
2671                                currentIterator = highIterator;
2672                                return currentIterator.hasNext();
2673                            }
2674                            if (defaultIterator.hasNext()) {
2675                                currentIterator = defaultIterator;
2676                                return currentIterator.hasNext();
2677                            }
2678                            if (lowIterator.hasNext()) {
2679                                currentIterator = lowIterator;
2680                                return currentIterator.hasNext();
2681                            }
2682                            return false;
2683                        } else {
2684                            currentIterator = defaultIterator;
2685                            return currentIterator.hasNext();
2686                        }
2687                    }
2688                    if (highIterator != null) {
2689                        if (currentIterator.hasNext()) {
2690                            return true;
2691                        }
2692                        if (currentIterator == highIterator) {
2693                            if (defaultIterator.hasNext()) {
2694                                currentIterator = defaultIterator;
2695                                return currentIterator.hasNext();
2696                            }
2697                            if (lowIterator.hasNext()) {
2698                                currentIterator = lowIterator;
2699                                return currentIterator.hasNext();
2700                            }
2701                            return false;
2702                        }
2703    
2704                        if (currentIterator == defaultIterator) {
2705                            if (lowIterator.hasNext()) {
2706                                currentIterator = lowIterator;
2707                                return currentIterator.hasNext();
2708                            }
2709                            return false;
2710                        }
2711                    }
2712                    return currentIterator.hasNext();
2713                }
2714    
2715                public Entry<Long, MessageKeys> next() {
2716                    Entry<Long, MessageKeys> result = currentIterator.next();
2717                    if (result != null) {
2718                        Long key = result.getKey();
2719                        if (highIterator != null) {
2720                            if (currentIterator == defaultIterator) {
2721                                lastDefaultKey = key;
2722                            } else if (currentIterator == highIterator) {
2723                                lastHighKey = key;
2724                            } else {
2725                                lastLowKey = key;
2726                            }
2727                        } else {
2728                            lastDefaultKey = key;
2729                        }
2730                    }
2731                    return result;
2732                }
2733    
2734                public void remove() {
2735                    throw new UnsupportedOperationException();
2736                }
2737    
2738            }
2739        }
2740    
2741        private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
2742            final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
2743    
2744            public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
2745                ByteArrayOutputStream baos = new ByteArrayOutputStream();
2746                ObjectOutputStream oout = new ObjectOutputStream(baos);
2747                oout.writeObject(object);
2748                oout.flush();
2749                oout.close();
2750                byte[] data = baos.toByteArray();
2751                dataOut.writeInt(data.length);
2752                dataOut.write(data);
2753            }
2754    
2755            @SuppressWarnings("unchecked")
2756            public HashSet<String> readPayload(DataInput dataIn) throws IOException {
2757                int dataLen = dataIn.readInt();
2758                byte[] data = new byte[dataLen];
2759                dataIn.readFully(data);
2760                ByteArrayInputStream bais = new ByteArrayInputStream(data);
2761                ObjectInputStream oin = new ObjectInputStream(bais);
2762                try {
2763                    return (HashSet<String>) oin.readObject();
2764                } catch (ClassNotFoundException cfe) {
2765                    IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
2766                    ioe.initCause(cfe);
2767                    throw ioe;
2768                }
2769            }
2770        }
2771    }