001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.ByteArrayOutputStream;
021 import java.io.DataInput;
022 import java.io.DataOutput;
023 import java.io.EOFException;
024 import java.io.File;
025 import java.io.IOException;
026 import java.io.InputStream;
027 import java.io.InterruptedIOException;
028 import java.io.ObjectInputStream;
029 import java.io.ObjectOutputStream;
030 import java.io.OutputStream;
031 import java.util.ArrayList;
032 import java.util.Collection;
033 import java.util.Collections;
034 import java.util.Date;
035 import java.util.HashMap;
036 import java.util.HashSet;
037 import java.util.Iterator;
038 import java.util.LinkedHashMap;
039 import java.util.LinkedHashSet;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Map.Entry;
043 import java.util.Set;
044 import java.util.SortedSet;
045 import java.util.Stack;
046 import java.util.TreeMap;
047 import java.util.TreeSet;
048 import java.util.concurrent.atomic.AtomicBoolean;
049 import java.util.concurrent.atomic.AtomicLong;
050 import java.util.concurrent.locks.ReentrantReadWriteLock;
051
052 import org.apache.activemq.ActiveMQMessageAuditNoSync;
053 import org.apache.activemq.broker.BrokerService;
054 import org.apache.activemq.broker.BrokerServiceAware;
055 import org.apache.activemq.command.MessageAck;
056 import org.apache.activemq.command.SubscriptionInfo;
057 import org.apache.activemq.command.TransactionId;
058 import org.apache.activemq.protobuf.Buffer;
059 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
060 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
061 import org.apache.activemq.store.kahadb.data.KahaDestination;
062 import org.apache.activemq.store.kahadb.data.KahaEntryType;
063 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
064 import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
065 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
066 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
067 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
068 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
069 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
070 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
071 import org.apache.activemq.util.Callback;
072 import org.apache.activemq.util.IOHelper;
073 import org.apache.activemq.util.ServiceStopper;
074 import org.apache.activemq.util.ServiceSupport;
075 import org.apache.kahadb.index.BTreeIndex;
076 import org.apache.kahadb.index.BTreeVisitor;
077 import org.apache.kahadb.index.ListIndex;
078 import org.apache.kahadb.journal.DataFile;
079 import org.apache.kahadb.journal.Journal;
080 import org.apache.kahadb.journal.Location;
081 import org.apache.kahadb.page.Page;
082 import org.apache.kahadb.page.PageFile;
083 import org.apache.kahadb.page.Transaction;
084 import org.apache.kahadb.util.ByteSequence;
085 import org.apache.kahadb.util.DataByteArrayInputStream;
086 import org.apache.kahadb.util.DataByteArrayOutputStream;
087 import org.apache.kahadb.util.LocationMarshaller;
088 import org.apache.kahadb.util.LockFile;
089 import org.apache.kahadb.util.LongMarshaller;
090 import org.apache.kahadb.util.Marshaller;
091 import org.apache.kahadb.util.Sequence;
092 import org.apache.kahadb.util.SequenceSet;
093 import org.apache.kahadb.util.StringMarshaller;
094 import org.apache.kahadb.util.VariableMarshaller;
095 import org.slf4j.Logger;
096 import org.slf4j.LoggerFactory;
097
098 public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
099
100 protected BrokerService brokerService;
101
102 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
103 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
104 public static final File DEFAULT_DIRECTORY = new File("KahaDB");
105 protected static final Buffer UNMATCHED;
106 static {
107 UNMATCHED = new Buffer(new byte[]{});
108 }
109 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
110 private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
111
112 static final int CLOSED_STATE = 1;
113 static final int OPEN_STATE = 2;
114 static final long NOT_ACKED = -1;
115
116 static final int VERSION = 4;
117
118 protected class Metadata {
119 protected Page<Metadata> page;
120 protected int state;
121 protected BTreeIndex<String, StoredDestination> destinations;
122 protected Location lastUpdate;
123 protected Location firstInProgressTransactionLocation;
124 protected Location producerSequenceIdTrackerLocation = null;
125 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
126 protected int version = VERSION;
127 public void read(DataInput is) throws IOException {
128 state = is.readInt();
129 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
130 if (is.readBoolean()) {
131 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
132 } else {
133 lastUpdate = null;
134 }
135 if (is.readBoolean()) {
136 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
137 } else {
138 firstInProgressTransactionLocation = null;
139 }
140 try {
141 if (is.readBoolean()) {
142 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
143 } else {
144 producerSequenceIdTrackerLocation = null;
145 }
146 } catch (EOFException expectedOnUpgrade) {
147 }
148 try {
149 version = is.readInt();
150 } catch (EOFException expectedOnUpgrade) {
151 version=1;
152 }
153 LOG.info("KahaDB is version " + version);
154 }
155
156 public void write(DataOutput os) throws IOException {
157 os.writeInt(state);
158 os.writeLong(destinations.getPageId());
159
160 if (lastUpdate != null) {
161 os.writeBoolean(true);
162 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
163 } else {
164 os.writeBoolean(false);
165 }
166
167 if (firstInProgressTransactionLocation != null) {
168 os.writeBoolean(true);
169 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
170 } else {
171 os.writeBoolean(false);
172 }
173
174 if (producerSequenceIdTrackerLocation != null) {
175 os.writeBoolean(true);
176 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
177 } else {
178 os.writeBoolean(false);
179 }
180 os.writeInt(VERSION);
181 }
182 }
183
184 class MetadataMarshaller extends VariableMarshaller<Metadata> {
185 public Metadata readPayload(DataInput dataIn) throws IOException {
186 Metadata rc = new Metadata();
187 rc.read(dataIn);
188 return rc;
189 }
190
191 public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
192 object.write(dataOut);
193 }
194 }
195
196 protected PageFile pageFile;
197 protected Journal journal;
198 protected Metadata metadata = new Metadata();
199
200 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
201
202 protected boolean failIfDatabaseIsLocked;
203
204 protected boolean deleteAllMessages;
205 protected File directory = DEFAULT_DIRECTORY;
206 protected Thread checkpointThread;
207 protected boolean enableJournalDiskSyncs=true;
208 protected boolean archiveDataLogs;
209 protected File directoryArchive;
210 protected AtomicLong journalSize = new AtomicLong(0);
211 long checkpointInterval = 5*1000;
212 long cleanupInterval = 30*1000;
213 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
214 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
215 boolean enableIndexWriteAsync = false;
216 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
217
218 protected AtomicBoolean opened = new AtomicBoolean();
219 private LockFile lockFile;
220 private boolean ignoreMissingJournalfiles = false;
221 private int indexCacheSize = 10000;
222 private boolean checkForCorruptJournalFiles = false;
223 private boolean checksumJournalFiles = false;
224 protected boolean forceRecoverIndex = false;
225 private final Object checkpointThreadLock = new Object();
226 private boolean rewriteOnRedelivery = false;
227 private boolean archiveCorruptedIndex = false;
228 private boolean useIndexLFRUEviction = false;
229 private float indexLFUEvictionFactor = 0.2f;
230 private boolean enableIndexDiskSyncs = true;
231 private boolean enableIndexRecoveryFile = true;
232 private boolean enableIndexPageCaching = true;
233
234 public MessageDatabase() {
235 }
236
237 @Override
238 public void doStart() throws Exception {
239 load();
240 }
241
242 @Override
243 public void doStop(ServiceStopper stopper) throws Exception {
244 unload();
245 }
246
247 private void loadPageFile() throws IOException {
248 this.indexLock.writeLock().lock();
249 try {
250 final PageFile pageFile = getPageFile();
251 pageFile.load();
252 pageFile.tx().execute(new Transaction.Closure<IOException>() {
253 public void execute(Transaction tx) throws IOException {
254 if (pageFile.getPageCount() == 0) {
255 // First time this is created.. Initialize the metadata
256 Page<Metadata> page = tx.allocate();
257 assert page.getPageId() == 0;
258 page.set(metadata);
259 metadata.page = page;
260 metadata.state = CLOSED_STATE;
261 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
262
263 tx.store(metadata.page, metadataMarshaller, true);
264 } else {
265 Page<Metadata> page = tx.load(0, metadataMarshaller);
266 metadata = page.get();
267 metadata.page = page;
268 }
269 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
270 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
271 metadata.destinations.load(tx);
272 }
273 });
274 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
275 // Perhaps we should just keep an index of file
276 storedDestinations.clear();
277 pageFile.tx().execute(new Transaction.Closure<IOException>() {
278 public void execute(Transaction tx) throws IOException {
279 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
280 Entry<String, StoredDestination> entry = iterator.next();
281 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
282 storedDestinations.put(entry.getKey(), sd);
283 }
284 }
285 });
286 pageFile.flush();
287 } finally {
288 this.indexLock.writeLock().unlock();
289 }
290 }
291
292 private void startCheckpoint() {
293 if (checkpointInterval == 0 && cleanupInterval == 0) {
294 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
295 return;
296 }
297 synchronized (checkpointThreadLock) {
298 boolean start = false;
299 if (checkpointThread == null) {
300 start = true;
301 } else if (!checkpointThread.isAlive()) {
302 start = true;
303 LOG.info("KahaDB: Recovering checkpoint thread after death");
304 }
305 if (start) {
306 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
307 @Override
308 public void run() {
309 try {
310 long lastCleanup = System.currentTimeMillis();
311 long lastCheckpoint = System.currentTimeMillis();
312 // Sleep for a short time so we can periodically check
313 // to see if we need to exit this thread.
314 long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
315 while (opened.get()) {
316 Thread.sleep(sleepTime);
317 long now = System.currentTimeMillis();
318 if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
319 checkpointCleanup(true);
320 lastCleanup = now;
321 lastCheckpoint = now;
322 } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
323 checkpointCleanup(false);
324 lastCheckpoint = now;
325 }
326 }
327 } catch (InterruptedException e) {
328 // Looks like someone really wants us to exit this thread...
329 } catch (IOException ioe) {
330 LOG.error("Checkpoint failed", ioe);
331 brokerService.handleIOException(ioe);
332 }
333 }
334 };
335
336 checkpointThread.setDaemon(true);
337 checkpointThread.start();
338 }
339 }
340 }
341
342 public void open() throws IOException {
343 if( opened.compareAndSet(false, true) ) {
344 getJournal().start();
345 try {
346 loadPageFile();
347 } catch (Throwable t) {
348 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
349 if (LOG.isDebugEnabled()) {
350 LOG.debug("Index load failure", t);
351 }
352 // try to recover index
353 try {
354 pageFile.unload();
355 } catch (Exception ignore) {}
356 if (archiveCorruptedIndex) {
357 pageFile.archive();
358 } else {
359 pageFile.delete();
360 }
361 metadata = new Metadata();
362 pageFile = null;
363 loadPageFile();
364 }
365 startCheckpoint();
366 recover();
367 }
368 }
369
370 public void load() throws IOException {
371 this.indexLock.writeLock().lock();
372 IOHelper.mkdirs(directory);
373 try {
374 if (deleteAllMessages) {
375 getJournal().start();
376 getJournal().delete();
377 getJournal().close();
378 journal = null;
379 getPageFile().delete();
380 LOG.info("Persistence store purged.");
381 deleteAllMessages = false;
382 }
383
384 open();
385 store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
386 } finally {
387 this.indexLock.writeLock().unlock();
388 }
389 }
390
391 public void close() throws IOException, InterruptedException {
392 if( opened.compareAndSet(true, false)) {
393 this.indexLock.writeLock().lock();
394 try {
395 if (metadata.page != null) {
396 pageFile.tx().execute(new Transaction.Closure<IOException>() {
397 public void execute(Transaction tx) throws IOException {
398 checkpointUpdate(tx, true);
399 }
400 });
401 }
402 pageFile.unload();
403 metadata = new Metadata();
404 } finally {
405 this.indexLock.writeLock().unlock();
406 }
407 journal.close();
408 synchronized (checkpointThreadLock) {
409 if (checkpointThread != null) {
410 checkpointThread.join();
411 }
412 }
413 }
414 }
415
416 public void unload() throws IOException, InterruptedException {
417 this.indexLock.writeLock().lock();
418 try {
419 if( pageFile != null && pageFile.isLoaded() ) {
420 metadata.state = CLOSED_STATE;
421 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
422
423 if (metadata.page != null) {
424 pageFile.tx().execute(new Transaction.Closure<IOException>() {
425 public void execute(Transaction tx) throws IOException {
426 tx.store(metadata.page, metadataMarshaller, true);
427 }
428 });
429 }
430 }
431 } finally {
432 this.indexLock.writeLock().unlock();
433 }
434 close();
435 }
436
437 // public for testing
438 @SuppressWarnings("rawtypes")
439 public Location getFirstInProgressTxLocation() {
440 Location l = null;
441 synchronized (inflightTransactions) {
442 if (!inflightTransactions.isEmpty()) {
443 for (List<Operation> ops : inflightTransactions.values()) {
444 if (!ops.isEmpty()) {
445 l = ops.get(0).getLocation();
446 break;
447 }
448 }
449 }
450 if (!preparedTransactions.isEmpty()) {
451 for (List<Operation> ops : preparedTransactions.values()) {
452 if (!ops.isEmpty()) {
453 Location t = ops.get(0).getLocation();
454 if (l==null || t.compareTo(l) <= 0) {
455 l = t;
456 }
457 break;
458 }
459 }
460 }
461 }
462 return l;
463 }
464
465 /**
466 * Move all the messages that were in the journal into long term storage. We
467 * just replay and do a checkpoint.
468 *
469 * @throws IOException
470 * @throws IOException
471 * @throws IllegalStateException
472 */
473 private void recover() throws IllegalStateException, IOException {
474 this.indexLock.writeLock().lock();
475 try {
476
477 long start = System.currentTimeMillis();
478 Location producerAuditPosition = recoverProducerAudit();
479 Location lastIndoubtPosition = getRecoveryPosition();
480
481 Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
482
483 if (recoveryPosition != null) {
484 int redoCounter = 0;
485 LOG.info("Recovering from the journal ...");
486 while (recoveryPosition != null) {
487 JournalCommand<?> message = load(recoveryPosition);
488 metadata.lastUpdate = recoveryPosition;
489 process(message, recoveryPosition, lastIndoubtPosition);
490 redoCounter++;
491 recoveryPosition = journal.getNextLocation(recoveryPosition);
492 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
493 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
494 }
495 }
496 if (LOG.isInfoEnabled()) {
497 long end = System.currentTimeMillis();
498 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
499 }
500 }
501
502 // We may have to undo some index updates.
503 pageFile.tx().execute(new Transaction.Closure<IOException>() {
504 public void execute(Transaction tx) throws IOException {
505 recoverIndex(tx);
506 }
507 });
508
509 // rollback any recovered inflight local transactions
510 Set<TransactionId> toRollback = new HashSet<TransactionId>();
511 synchronized (inflightTransactions) {
512 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
513 TransactionId id = it.next();
514 if (id.isLocalTransaction()) {
515 toRollback.add(id);
516 }
517 }
518 for (TransactionId tx: toRollback) {
519 if (LOG.isDebugEnabled()) {
520 LOG.debug("rolling back recovered indoubt local transaction " + tx);
521 }
522 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
523 }
524 }
525 } finally {
526 this.indexLock.writeLock().unlock();
527 }
528 }
529
530 @SuppressWarnings("unused")
531 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
532 return TransactionIdConversion.convertToLocal(tx);
533 }
534
535 private Location minimum(Location producerAuditPosition,
536 Location lastIndoubtPosition) {
537 Location min = null;
538 if (producerAuditPosition != null) {
539 min = producerAuditPosition;
540 if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
541 min = lastIndoubtPosition;
542 }
543 } else {
544 min = lastIndoubtPosition;
545 }
546 return min;
547 }
548
549 private Location recoverProducerAudit() throws IOException {
550 if (metadata.producerSequenceIdTrackerLocation != null) {
551 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
552 try {
553 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
554 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
555 return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
556 } catch (Exception e) {
557 LOG.warn("Cannot recover message audit", e);
558 return journal.getNextLocation(null);
559 }
560 } else {
561 // got no audit stored so got to recreate via replay from start of the journal
562 return journal.getNextLocation(null);
563 }
564 }
565
566 protected void recoverIndex(Transaction tx) throws IOException {
567 long start = System.currentTimeMillis();
568 // It is possible index updates got applied before the journal updates..
569 // in that case we need to removed references to messages that are not in the journal
570 final Location lastAppendLocation = journal.getLastAppendLocation();
571 long undoCounter=0;
572
573 // Go through all the destinations to see if they have messages past the lastAppendLocation
574 for (StoredDestination sd : storedDestinations.values()) {
575
576 final ArrayList<Long> matches = new ArrayList<Long>();
577 // Find all the Locations that are >= than the last Append Location.
578 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
579 @Override
580 protected void matched(Location key, Long value) {
581 matches.add(value);
582 }
583 });
584
585 for (Long sequenceId : matches) {
586 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
587 sd.locationIndex.remove(tx, keys.location);
588 sd.messageIdIndex.remove(tx, keys.messageId);
589 metadata.producerSequenceIdTracker.rollback(keys.messageId);
590 undoCounter++;
591 // TODO: do we need to modify the ack positions for the pub sub case?
592 }
593 }
594
595 if( undoCounter > 0 ) {
596 // The rolledback operations are basically in flight journal writes. To avoid getting
597 // these the end user should do sync writes to the journal.
598 if (LOG.isInfoEnabled()) {
599 long end = System.currentTimeMillis();
600 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
601 }
602 }
603
604 undoCounter = 0;
605 start = System.currentTimeMillis();
606
607 // Lets be extra paranoid here and verify that all the datafiles being referenced
608 // by the indexes still exists.
609
610 final SequenceSet ss = new SequenceSet();
611 for (StoredDestination sd : storedDestinations.values()) {
612 // Use a visitor to cut down the number of pages that we load
613 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
614 int last=-1;
615
616 public boolean isInterestedInKeysBetween(Location first, Location second) {
617 if( first==null ) {
618 return !ss.contains(0, second.getDataFileId());
619 } else if( second==null ) {
620 return true;
621 } else {
622 return !ss.contains(first.getDataFileId(), second.getDataFileId());
623 }
624 }
625
626 public void visit(List<Location> keys, List<Long> values) {
627 for (Location l : keys) {
628 int fileId = l.getDataFileId();
629 if( last != fileId ) {
630 ss.add(fileId);
631 last = fileId;
632 }
633 }
634 }
635
636 });
637 }
638 HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
639 while (!ss.isEmpty()) {
640 missingJournalFiles.add((int) ss.removeFirst());
641 }
642 missingJournalFiles.removeAll(journal.getFileMap().keySet());
643
644 if (!missingJournalFiles.isEmpty()) {
645 if (LOG.isInfoEnabled()) {
646 LOG.info("Some journal files are missing: " + missingJournalFiles);
647 }
648 }
649
650 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
651 for (Integer missing : missingJournalFiles) {
652 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
653 }
654
655 if (checkForCorruptJournalFiles) {
656 Collection<DataFile> dataFiles = journal.getFileMap().values();
657 for (DataFile dataFile : dataFiles) {
658 int id = dataFile.getDataFileId();
659 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
660 Sequence seq = dataFile.getCorruptedBlocks().getHead();
661 while (seq != null) {
662 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
663 seq = seq.getNext();
664 }
665 }
666 }
667
668 if (!missingPredicates.isEmpty()) {
669 for (StoredDestination sd : storedDestinations.values()) {
670
671 final ArrayList<Long> matches = new ArrayList<Long>();
672 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
673 @Override
674 protected void matched(Location key, Long value) {
675 matches.add(value);
676 }
677 });
678
679 // If somes message references are affected by the missing data files...
680 if (!matches.isEmpty()) {
681
682 // We either 'gracefully' recover dropping the missing messages or
683 // we error out.
684 if( ignoreMissingJournalfiles ) {
685 // Update the index to remove the references to the missing data
686 for (Long sequenceId : matches) {
687 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
688 sd.locationIndex.remove(tx, keys.location);
689 sd.messageIdIndex.remove(tx, keys.messageId);
690 undoCounter++;
691 // TODO: do we need to modify the ack positions for the pub sub case?
692 }
693
694 } else {
695 throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
696 }
697 }
698 }
699 }
700
701 if( undoCounter > 0 ) {
702 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user
703 // should do sync writes to the journal.
704 if (LOG.isInfoEnabled()) {
705 long end = System.currentTimeMillis();
706 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
707 }
708 }
709 }
710
711 private Location nextRecoveryPosition;
712 private Location lastRecoveryPosition;
713
714 public void incrementalRecover() throws IOException {
715 this.indexLock.writeLock().lock();
716 try {
717 if( nextRecoveryPosition == null ) {
718 if( lastRecoveryPosition==null ) {
719 nextRecoveryPosition = getRecoveryPosition();
720 } else {
721 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
722 }
723 }
724 while (nextRecoveryPosition != null) {
725 lastRecoveryPosition = nextRecoveryPosition;
726 metadata.lastUpdate = lastRecoveryPosition;
727 JournalCommand<?> message = load(lastRecoveryPosition);
728 process(message, lastRecoveryPosition, (Runnable)null);
729 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
730 }
731 } finally {
732 this.indexLock.writeLock().unlock();
733 }
734 }
735
736 public Location getLastUpdatePosition() throws IOException {
737 return metadata.lastUpdate;
738 }
739
740 private Location getRecoveryPosition() throws IOException {
741
742 if (!this.forceRecoverIndex) {
743
744 // If we need to recover the transactions..
745 if (metadata.firstInProgressTransactionLocation != null) {
746 return metadata.firstInProgressTransactionLocation;
747 }
748
749 // Perhaps there were no transactions...
750 if( metadata.lastUpdate!=null) {
751 // Start replay at the record after the last one recorded in the index file.
752 return journal.getNextLocation(metadata.lastUpdate);
753 }
754 }
755 // This loads the first position.
756 return journal.getNextLocation(null);
757 }
758
759 protected void checkpointCleanup(final boolean cleanup) throws IOException {
760 long start;
761 this.indexLock.writeLock().lock();
762 try {
763 start = System.currentTimeMillis();
764 if( !opened.get() ) {
765 return;
766 }
767 pageFile.tx().execute(new Transaction.Closure<IOException>() {
768 public void execute(Transaction tx) throws IOException {
769 checkpointUpdate(tx, cleanup);
770 }
771 });
772 } finally {
773 this.indexLock.writeLock().unlock();
774 }
775
776 long end = System.currentTimeMillis();
777 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
778 if (LOG.isInfoEnabled()) {
779 LOG.info("Slow KahaDB access: cleanup took " + (end - start));
780 }
781 }
782 }
783
784 public void checkpoint(Callback closure) throws Exception {
785 this.indexLock.writeLock().lock();
786 try {
787 pageFile.tx().execute(new Transaction.Closure<IOException>() {
788 public void execute(Transaction tx) throws IOException {
789 checkpointUpdate(tx, false);
790 }
791 });
792 closure.execute();
793 } finally {
794 this.indexLock.writeLock().unlock();
795 }
796 }
797
798 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
799 int size = data.serializedSizeFramed();
800 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
801 os.writeByte(data.type().getNumber());
802 data.writeFramed(os);
803 return os.toByteSequence();
804 }
805
806 // /////////////////////////////////////////////////////////////////
807 // Methods call by the broker to update and query the store.
808 // /////////////////////////////////////////////////////////////////
809 public Location store(JournalCommand<?> data) throws IOException {
810 return store(data, false, null,null);
811 }
812
813 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
814 return store(data, false, null,null, onJournalStoreComplete);
815 }
816
817 public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
818 return store(data, sync, before, after, null);
819 }
820
821 /**
822 * All updated are are funneled through this method. The updates are converted
823 * to a JournalMessage which is logged to the journal and then the data from
824 * the JournalMessage is used to update the index just like it would be done
825 * during a recovery process.
826 */
827 public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException {
828 if (before != null) {
829 before.run();
830 }
831 try {
832 ByteSequence sequence = toByteSequence(data);
833 long start = System.currentTimeMillis();
834 Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
835 long start2 = System.currentTimeMillis();
836 process(data, location, after);
837 long end = System.currentTimeMillis();
838 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
839 if (LOG.isInfoEnabled()) {
840 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
841 }
842 }
843
844 if (after != null) {
845 Runnable afterCompletion = null;
846 synchronized (orderedTransactionAfters) {
847 if (!orderedTransactionAfters.empty()) {
848 afterCompletion = orderedTransactionAfters.pop();
849 }
850 }
851 if (afterCompletion != null) {
852 afterCompletion.run();
853 } else {
854 // non persistent message case
855 after.run();
856 }
857 }
858
859 if (checkpointThread != null && !checkpointThread.isAlive()) {
860 startCheckpoint();
861 }
862 return location;
863 } catch (IOException ioe) {
864 LOG.error("KahaDB failed to store to Journal", ioe);
865 brokerService.handleIOException(ioe);
866 throw ioe;
867 }
868 }
869
870 /**
871 * Loads a previously stored JournalMessage
872 *
873 * @param location
874 * @return
875 * @throws IOException
876 */
877 public JournalCommand<?> load(Location location) throws IOException {
878 long start = System.currentTimeMillis();
879 ByteSequence data = journal.read(location);
880 long end = System.currentTimeMillis();
881 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
882 if (LOG.isInfoEnabled()) {
883 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
884 }
885 }
886 DataByteArrayInputStream is = new DataByteArrayInputStream(data);
887 byte readByte = is.readByte();
888 KahaEntryType type = KahaEntryType.valueOf(readByte);
889 if( type == null ) {
890 throw new IOException("Could not load journal record. Invalid location: "+location);
891 }
892 JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
893 message.mergeFramed(is);
894 return message;
895 }
896
897 /**
898 * do minimal recovery till we reach the last inDoubtLocation
899 * @param data
900 * @param location
901 * @param inDoubtlocation
902 * @throws IOException
903 */
904 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
905 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
906 process(data, location, (Runnable) null);
907 } else {
908 // just recover producer audit
909 data.visit(new Visitor() {
910 public void visit(KahaAddMessageCommand command) throws IOException {
911 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
912 }
913 });
914 }
915 }
916
917 // /////////////////////////////////////////////////////////////////
918 // Journaled record processing methods. Once the record is journaled,
919 // these methods handle applying the index updates. These may be called
920 // from the recovery method too so they need to be idempotent
921 // /////////////////////////////////////////////////////////////////
922
923 void process(JournalCommand<?> data, final Location location, final Runnable after) throws IOException {
924 data.visit(new Visitor() {
925 @Override
926 public void visit(KahaAddMessageCommand command) throws IOException {
927 process(command, location);
928 }
929
930 @Override
931 public void visit(KahaRemoveMessageCommand command) throws IOException {
932 process(command, location);
933 }
934
935 @Override
936 public void visit(KahaPrepareCommand command) throws IOException {
937 process(command, location);
938 }
939
940 @Override
941 public void visit(KahaCommitCommand command) throws IOException {
942 process(command, location, after);
943 }
944
945 @Override
946 public void visit(KahaRollbackCommand command) throws IOException {
947 process(command, location);
948 }
949
950 @Override
951 public void visit(KahaRemoveDestinationCommand command) throws IOException {
952 process(command, location);
953 }
954
955 @Override
956 public void visit(KahaSubscriptionCommand command) throws IOException {
957 process(command, location);
958 }
959
960 @Override
961 public void visit(KahaProducerAuditCommand command) throws IOException {
962 processLocation(location);
963 }
964
965 @Override
966 public void visit(KahaTraceCommand command) {
967 processLocation(location);
968 }
969 });
970 }
971
972 @SuppressWarnings("rawtypes")
973 protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
974 if (command.hasTransactionInfo()) {
975 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
976 inflightTx.add(new AddOpperation(command, location));
977 } else {
978 this.indexLock.writeLock().lock();
979 try {
980 pageFile.tx().execute(new Transaction.Closure<IOException>() {
981 public void execute(Transaction tx) throws IOException {
982 upadateIndex(tx, command, location);
983 }
984 });
985 } finally {
986 this.indexLock.writeLock().unlock();
987 }
988 }
989 }
990
991 @SuppressWarnings("rawtypes")
992 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
993 if (command.hasTransactionInfo()) {
994 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
995 inflightTx.add(new RemoveOpperation(command, location));
996 } else {
997 this.indexLock.writeLock().lock();
998 try {
999 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1000 public void execute(Transaction tx) throws IOException {
1001 updateIndex(tx, command, location);
1002 }
1003 });
1004 } finally {
1005 this.indexLock.writeLock().unlock();
1006 }
1007 }
1008 }
1009
1010 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1011 this.indexLock.writeLock().lock();
1012 try {
1013 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1014 public void execute(Transaction tx) throws IOException {
1015 updateIndex(tx, command, location);
1016 }
1017 });
1018 } finally {
1019 this.indexLock.writeLock().unlock();
1020 }
1021 }
1022
1023 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1024 this.indexLock.writeLock().lock();
1025 try {
1026 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1027 public void execute(Transaction tx) throws IOException {
1028 updateIndex(tx, command, location);
1029 }
1030 });
1031 } finally {
1032 this.indexLock.writeLock().unlock();
1033 }
1034 }
1035
1036 protected void processLocation(final Location location) {
1037 this.indexLock.writeLock().lock();
1038 try {
1039 metadata.lastUpdate = location;
1040 } finally {
1041 this.indexLock.writeLock().unlock();
1042 }
1043 }
1044
1045 private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>();
1046 private void push(Runnable after) {
1047 if (after != null) {
1048 synchronized (orderedTransactionAfters) {
1049 orderedTransactionAfters.push(after);
1050 }
1051 }
1052 }
1053
1054 @SuppressWarnings("rawtypes")
1055 protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
1056 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1057 List<Operation> inflightTx;
1058 synchronized (inflightTransactions) {
1059 inflightTx = inflightTransactions.remove(key);
1060 if (inflightTx == null) {
1061 inflightTx = preparedTransactions.remove(key);
1062 }
1063 }
1064 if (inflightTx == null) {
1065 if (after != null) {
1066 // since we don't push this after and we may find another, lets run it now
1067 after.run();
1068 }
1069 return;
1070 }
1071
1072 final List<Operation> messagingTx = inflightTx;
1073 this.indexLock.writeLock().lock();
1074 try {
1075 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1076 public void execute(Transaction tx) throws IOException {
1077 for (Operation op : messagingTx) {
1078 op.execute(tx);
1079 }
1080 }
1081 });
1082 metadata.lastUpdate = location;
1083 push(after);
1084 } finally {
1085 this.indexLock.writeLock().unlock();
1086 }
1087 }
1088
1089 @SuppressWarnings("rawtypes")
1090 protected void process(KahaPrepareCommand command, Location location) {
1091 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1092 synchronized (inflightTransactions) {
1093 List<Operation> tx = inflightTransactions.remove(key);
1094 if (tx != null) {
1095 preparedTransactions.put(key, tx);
1096 }
1097 }
1098 }
1099
1100 @SuppressWarnings("rawtypes")
1101 protected void process(KahaRollbackCommand command, Location location) throws IOException {
1102 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1103 List<Operation> updates = null;
1104 synchronized (inflightTransactions) {
1105 updates = inflightTransactions.remove(key);
1106 if (updates == null) {
1107 updates = preparedTransactions.remove(key);
1108 }
1109 }
1110 if (isRewriteOnRedelivery()) {
1111 persistRedeliveryCount(updates);
1112 }
1113 }
1114
1115 @SuppressWarnings("rawtypes")
1116 private void persistRedeliveryCount(List<Operation> updates) throws IOException {
1117 if (updates != null) {
1118 for (Operation operation : updates) {
1119 operation.getCommand().visit(new Visitor() {
1120 @Override
1121 public void visit(KahaRemoveMessageCommand command) throws IOException {
1122 incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination());
1123 }
1124 });
1125 }
1126 }
1127 }
1128
1129 abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException;
1130
1131 // /////////////////////////////////////////////////////////////////
1132 // These methods do the actual index updates.
1133 // /////////////////////////////////////////////////////////////////
1134
1135 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1136 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1137
1138 void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1139 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1140
1141 // Skip adding the message to the index if this is a topic and there are
1142 // no subscriptions.
1143 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1144 return;
1145 }
1146
1147 // Add the message.
1148 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1149 long id = sd.orderIndex.getNextMessageId(priority);
1150 Long previous = sd.locationIndex.put(tx, location, id);
1151 if (previous == null) {
1152 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1153 if (previous == null) {
1154 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1155 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1156 addAckLocationForNewMessage(tx, sd, id);
1157 }
1158 } else {
1159 // If the message ID as indexed, then the broker asked us to
1160 // store a DUP
1161 // message. Bad BOY! Don't do it, and log a warning.
1162 LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
1163 sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1164 sd.locationIndex.remove(tx, location);
1165 rollbackStatsOnDuplicate(command.getDestination());
1166 }
1167 } else {
1168 // restore the previous value.. Looks like this was a redo of a
1169 // previously
1170 // added message. We don't want to assign it a new id as the other
1171 // indexes would
1172 // be wrong..
1173 //
1174 sd.locationIndex.put(tx, location, previous);
1175 }
1176 // record this id in any event, initial send or recovery
1177 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1178 metadata.lastUpdate = location;
1179 }
1180
1181 abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
1182
1183 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1184 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1185 if (!command.hasSubscriptionKey()) {
1186
1187 // In the queue case we just remove the message from the index..
1188 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1189 if (sequenceId != null) {
1190 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1191 if (keys != null) {
1192 sd.locationIndex.remove(tx, keys.location);
1193 recordAckMessageReferenceLocation(ackLocation, keys.location);
1194 } else if (LOG.isDebugEnabled()) {
1195 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId());
1196 }
1197 } else if (LOG.isDebugEnabled()) {
1198 LOG.debug("message not found in sequence id index: " + command.getMessageId());
1199 }
1200 } else {
1201 // In the topic case we need remove the message once it's been acked
1202 // by all the subs
1203 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1204
1205 // Make sure it's a valid message id...
1206 if (sequence != null) {
1207 String subscriptionKey = command.getSubscriptionKey();
1208 if (command.getAck() != UNMATCHED) {
1209 sd.orderIndex.get(tx, sequence);
1210 byte priority = sd.orderIndex.lastGetPriority();
1211 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1212 }
1213 // The following method handles deleting un-referenced messages.
1214 removeAckLocation(tx, sd, subscriptionKey, sequence);
1215 } else if (LOG.isDebugEnabled()) {
1216 LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1217 }
1218
1219 }
1220 metadata.lastUpdate = ackLocation;
1221 }
1222
1223 Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
1224 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1225 Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1226 if (referenceFileIds == null) {
1227 referenceFileIds = new HashSet<Integer>();
1228 referenceFileIds.add(messageLocation.getDataFileId());
1229 ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1230 } else {
1231 Integer id = Integer.valueOf(messageLocation.getDataFileId());
1232 if (!referenceFileIds.contains(id)) {
1233 referenceFileIds.add(id);
1234 }
1235 }
1236 }
1237
1238 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1239 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1240 sd.orderIndex.remove(tx);
1241
1242 sd.locationIndex.clear(tx);
1243 sd.locationIndex.unload(tx);
1244 tx.free(sd.locationIndex.getPageId());
1245
1246 sd.messageIdIndex.clear(tx);
1247 sd.messageIdIndex.unload(tx);
1248 tx.free(sd.messageIdIndex.getPageId());
1249
1250 if (sd.subscriptions != null) {
1251 sd.subscriptions.clear(tx);
1252 sd.subscriptions.unload(tx);
1253 tx.free(sd.subscriptions.getPageId());
1254
1255 sd.subscriptionAcks.clear(tx);
1256 sd.subscriptionAcks.unload(tx);
1257 tx.free(sd.subscriptionAcks.getPageId());
1258
1259 sd.ackPositions.clear(tx);
1260 sd.ackPositions.unload(tx);
1261 tx.free(sd.ackPositions.getHeadPageId());
1262 }
1263
1264 String key = key(command.getDestination());
1265 storedDestinations.remove(key);
1266 metadata.destinations.remove(tx, key);
1267 }
1268
1269 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1270 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1271 final String subscriptionKey = command.getSubscriptionKey();
1272
1273 // If set then we are creating it.. otherwise we are destroying the sub
1274 if (command.hasSubscriptionInfo()) {
1275 sd.subscriptions.put(tx, subscriptionKey, command);
1276 long ackLocation=NOT_ACKED;
1277 if (!command.getRetroactive()) {
1278 ackLocation = sd.orderIndex.nextMessageId-1;
1279 } else {
1280 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1281 }
1282 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1283 sd.subscriptionCache.add(subscriptionKey);
1284 } else {
1285 // delete the sub...
1286 sd.subscriptions.remove(tx, subscriptionKey);
1287 sd.subscriptionAcks.remove(tx, subscriptionKey);
1288 sd.subscriptionCache.remove(subscriptionKey);
1289 removeAckLocationsForSub(tx, sd, subscriptionKey);
1290
1291 if (sd.subscriptions.isEmpty(tx)) {
1292 sd.messageIdIndex.clear(tx);
1293 sd.locationIndex.clear(tx);
1294 sd.orderIndex.clear(tx);
1295 }
1296 }
1297 }
1298
1299 /**
1300 * @param tx
1301 * @throws IOException
1302 */
1303 void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1304 LOG.debug("Checkpoint started.");
1305
1306 // reflect last update exclusive of current checkpoint
1307 Location firstTxLocation = metadata.lastUpdate;
1308
1309 metadata.state = OPEN_STATE;
1310 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1311 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
1312 tx.store(metadata.page, metadataMarshaller, true);
1313 pageFile.flush();
1314
1315 if( cleanup ) {
1316
1317 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1318 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1319
1320 if (LOG.isTraceEnabled()) {
1321 LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
1322 }
1323
1324 // Don't GC files under replication
1325 if( journalFilesBeingReplicated!=null ) {
1326 gcCandidateSet.removeAll(journalFilesBeingReplicated);
1327 }
1328
1329 if (metadata.producerSequenceIdTrackerLocation != null) {
1330 gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId());
1331 }
1332
1333 // Don't GC files after the first in progress tx
1334 if( metadata.firstInProgressTransactionLocation!=null ) {
1335 if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
1336 firstTxLocation = metadata.firstInProgressTransactionLocation;
1337 }
1338 }
1339
1340 if( firstTxLocation!=null ) {
1341 while( !gcCandidateSet.isEmpty() ) {
1342 Integer last = gcCandidateSet.last();
1343 if( last >= firstTxLocation.getDataFileId() ) {
1344 gcCandidateSet.remove(last);
1345 } else {
1346 break;
1347 }
1348 }
1349 if (LOG.isTraceEnabled()) {
1350 LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
1351 }
1352 }
1353
1354 // Go through all the destinations to see if any of them can remove GC candidates.
1355 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1356 if( gcCandidateSet.isEmpty() ) {
1357 break;
1358 }
1359
1360 // Use a visitor to cut down the number of pages that we load
1361 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1362 int last=-1;
1363 public boolean isInterestedInKeysBetween(Location first, Location second) {
1364 if( first==null ) {
1365 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1366 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1367 subset.remove(second.getDataFileId());
1368 }
1369 return !subset.isEmpty();
1370 } else if( second==null ) {
1371 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1372 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1373 subset.remove(first.getDataFileId());
1374 }
1375 return !subset.isEmpty();
1376 } else {
1377 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1378 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1379 subset.remove(first.getDataFileId());
1380 }
1381 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1382 subset.remove(second.getDataFileId());
1383 }
1384 return !subset.isEmpty();
1385 }
1386 }
1387
1388 public void visit(List<Location> keys, List<Long> values) {
1389 for (Location l : keys) {
1390 int fileId = l.getDataFileId();
1391 if( last != fileId ) {
1392 gcCandidateSet.remove(fileId);
1393 last = fileId;
1394 }
1395 }
1396 }
1397 });
1398 if (LOG.isTraceEnabled()) {
1399 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1400 }
1401 }
1402
1403 // check we are not deleting file with ack for in-use journal files
1404 if (LOG.isTraceEnabled()) {
1405 LOG.trace("gc candidates: " + gcCandidateSet);
1406 }
1407 final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
1408 Iterator<Integer> candidates = gcCandidateSet.iterator();
1409 while (candidates.hasNext()) {
1410 Integer candidate = candidates.next();
1411 Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
1412 if (referencedFileIds != null) {
1413 for (Integer referencedFileId : referencedFileIds) {
1414 if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
1415 // active file that is not targeted for deletion is referenced so don't delete
1416 candidates.remove();
1417 break;
1418 }
1419 }
1420 if (gcCandidateSet.contains(candidate)) {
1421 ackMessageFileMap.remove(candidate);
1422 } else {
1423 if (LOG.isTraceEnabled()) {
1424 LOG.trace("not removing data file: " + candidate
1425 + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1426 }
1427 }
1428 }
1429 }
1430
1431 if (!gcCandidateSet.isEmpty()) {
1432 if (LOG.isDebugEnabled()) {
1433 LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
1434 }
1435 journal.removeDataFiles(gcCandidateSet);
1436 }
1437 }
1438
1439 LOG.debug("Checkpoint done.");
1440 }
1441
1442 final Runnable nullCompletionCallback = new Runnable() {
1443 @Override
1444 public void run() {
1445 }
1446 };
1447 private Location checkpointProducerAudit() throws IOException {
1448 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1449 ObjectOutputStream oout = new ObjectOutputStream(baos);
1450 oout.writeObject(metadata.producerSequenceIdTracker);
1451 oout.flush();
1452 oout.close();
1453 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1454 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
1455 try {
1456 location.getLatch().await();
1457 } catch (InterruptedException e) {
1458 throw new InterruptedIOException(e.toString());
1459 }
1460 return location;
1461 }
1462
1463 public HashSet<Integer> getJournalFilesBeingReplicated() {
1464 return journalFilesBeingReplicated;
1465 }
1466
1467 // /////////////////////////////////////////////////////////////////
1468 // StoredDestination related implementation methods.
1469 // /////////////////////////////////////////////////////////////////
1470
1471 private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1472
1473 class StoredSubscription {
1474 SubscriptionInfo subscriptionInfo;
1475 String lastAckId;
1476 Location lastAckLocation;
1477 Location cursor;
1478 }
1479
1480 static class MessageKeys {
1481 final String messageId;
1482 final Location location;
1483
1484 public MessageKeys(String messageId, Location location) {
1485 this.messageId=messageId;
1486 this.location=location;
1487 }
1488
1489 @Override
1490 public String toString() {
1491 return "["+messageId+","+location+"]";
1492 }
1493 }
1494
1495 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1496 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1497
1498 public MessageKeys readPayload(DataInput dataIn) throws IOException {
1499 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1500 }
1501
1502 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1503 dataOut.writeUTF(object.messageId);
1504 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1505 }
1506 }
1507
1508 class LastAck {
1509 long lastAckedSequence;
1510 byte priority;
1511
1512 public LastAck(LastAck source) {
1513 this.lastAckedSequence = source.lastAckedSequence;
1514 this.priority = source.priority;
1515 }
1516
1517 public LastAck() {
1518 this.priority = MessageOrderIndex.HI;
1519 }
1520
1521 public LastAck(long ackLocation) {
1522 this.lastAckedSequence = ackLocation;
1523 this.priority = MessageOrderIndex.LO;
1524 }
1525
1526 public LastAck(long ackLocation, byte priority) {
1527 this.lastAckedSequence = ackLocation;
1528 this.priority = priority;
1529 }
1530
1531 public String toString() {
1532 return "[" + lastAckedSequence + ":" + priority + "]";
1533 }
1534 }
1535
1536 protected class LastAckMarshaller implements Marshaller<LastAck> {
1537
1538 public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1539 dataOut.writeLong(object.lastAckedSequence);
1540 dataOut.writeByte(object.priority);
1541 }
1542
1543 public LastAck readPayload(DataInput dataIn) throws IOException {
1544 LastAck lastAcked = new LastAck();
1545 lastAcked.lastAckedSequence = dataIn.readLong();
1546 if (metadata.version >= 3) {
1547 lastAcked.priority = dataIn.readByte();
1548 }
1549 return lastAcked;
1550 }
1551
1552 public int getFixedSize() {
1553 return 9;
1554 }
1555
1556 public LastAck deepCopy(LastAck source) {
1557 return new LastAck(source);
1558 }
1559
1560 public boolean isDeepCopySupported() {
1561 return true;
1562 }
1563 }
1564
1565 class StoredDestination {
1566
1567 MessageOrderIndex orderIndex = new MessageOrderIndex();
1568 BTreeIndex<Location, Long> locationIndex;
1569 BTreeIndex<String, Long> messageIdIndex;
1570
1571 // These bits are only set for Topics
1572 BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1573 BTreeIndex<String, LastAck> subscriptionAcks;
1574 HashMap<String, MessageOrderCursor> subscriptionCursors;
1575 ListIndex<String, SequenceSet> ackPositions;
1576
1577 // Transient data used to track which Messages are no longer needed.
1578 final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
1579 final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
1580 }
1581
1582 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1583
1584 public StoredDestination readPayload(final DataInput dataIn) throws IOException {
1585 final StoredDestination value = new StoredDestination();
1586 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1587 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1588 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1589
1590 if (dataIn.readBoolean()) {
1591 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1592 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1593 if (metadata.version >= 4) {
1594 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
1595 } else {
1596 // upgrade
1597 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1598 public void execute(Transaction tx) throws IOException {
1599 BTreeIndex<Long, HashSet<String>> oldAckPositions =
1600 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1601 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1602 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1603 oldAckPositions.load(tx);
1604
1605 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
1606
1607 // Do the initial build of the data in memory before writing into the store
1608 // based Ack Positions List to avoid a lot of disk thrashing.
1609 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
1610 while (iterator.hasNext()) {
1611 Entry<Long, HashSet<String>> entry = iterator.next();
1612
1613 for(String subKey : entry.getValue()) {
1614 SequenceSet pendingAcks = temp.get(subKey);
1615 if (pendingAcks == null) {
1616 pendingAcks = new SequenceSet();
1617 temp.put(subKey, pendingAcks);
1618 }
1619
1620 pendingAcks.add(entry.getKey());
1621 }
1622 }
1623
1624 // Now move the pending messages to ack data into the store backed
1625 // structure.
1626 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1627 for(String subscriptionKey : temp.keySet()) {
1628 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
1629 }
1630
1631 }
1632 });
1633 }
1634 }
1635 if (metadata.version >= 2) {
1636 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1637 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1638 } else {
1639 // upgrade
1640 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1641 public void execute(Transaction tx) throws IOException {
1642 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1643 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1644 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1645 value.orderIndex.lowPriorityIndex.load(tx);
1646
1647 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1648 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1649 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1650 value.orderIndex.highPriorityIndex.load(tx);
1651 }
1652 });
1653 }
1654
1655 return value;
1656 }
1657
1658 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1659 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
1660 dataOut.writeLong(value.locationIndex.getPageId());
1661 dataOut.writeLong(value.messageIdIndex.getPageId());
1662 if (value.subscriptions != null) {
1663 dataOut.writeBoolean(true);
1664 dataOut.writeLong(value.subscriptions.getPageId());
1665 dataOut.writeLong(value.subscriptionAcks.getPageId());
1666 dataOut.writeLong(value.ackPositions.getHeadPageId());
1667 } else {
1668 dataOut.writeBoolean(false);
1669 }
1670 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
1671 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
1672 }
1673 }
1674
1675 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1676 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1677
1678 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1679 KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1680 rc.mergeFramed((InputStream)dataIn);
1681 return rc;
1682 }
1683
1684 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1685 object.writeFramed((OutputStream)dataOut);
1686 }
1687 }
1688
1689 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1690 String key = key(destination);
1691 StoredDestination rc = storedDestinations.get(key);
1692 if (rc == null) {
1693 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1694 rc = loadStoredDestination(tx, key, topic);
1695 // Cache it. We may want to remove/unload destinations from the
1696 // cache that are not used for a while
1697 // to reduce memory usage.
1698 storedDestinations.put(key, rc);
1699 }
1700 return rc;
1701 }
1702
1703 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1704 String key = key(destination);
1705 StoredDestination rc = storedDestinations.get(key);
1706 if (rc == null && metadata.destinations.containsKey(tx, key)) {
1707 rc = getStoredDestination(destination, tx);
1708 }
1709 return rc;
1710 }
1711
1712 /**
1713 * @param tx
1714 * @param key
1715 * @param topic
1716 * @return
1717 * @throws IOException
1718 */
1719 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1720 // Try to load the existing indexes..
1721 StoredDestination rc = metadata.destinations.get(tx, key);
1722 if (rc == null) {
1723 // Brand new destination.. allocate indexes for it.
1724 rc = new StoredDestination();
1725 rc.orderIndex.allocate(tx);
1726 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1727 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1728
1729 if (topic) {
1730 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1731 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
1732 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1733 }
1734 metadata.destinations.put(tx, key, rc);
1735 }
1736
1737 // Configure the marshalers and load.
1738 rc.orderIndex.load(tx);
1739
1740 // Figure out the next key using the last entry in the destination.
1741 rc.orderIndex.configureLast(tx);
1742
1743 rc.locationIndex.setKeyMarshaller(org.apache.kahadb.util.LocationMarshaller.INSTANCE);
1744 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1745 rc.locationIndex.load(tx);
1746
1747 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1748 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1749 rc.messageIdIndex.load(tx);
1750
1751 // If it was a topic...
1752 if (topic) {
1753
1754 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1755 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1756 rc.subscriptions.load(tx);
1757
1758 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1759 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
1760 rc.subscriptionAcks.load(tx);
1761
1762 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
1763 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
1764 rc.ackPositions.load(tx);
1765
1766 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
1767
1768 if (metadata.version < 3) {
1769
1770 // on upgrade need to fill ackLocation with available messages past last ack
1771 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1772 Entry<String, LastAck> entry = iterator.next();
1773 for (Iterator<Entry<Long, MessageKeys>> orderIterator =
1774 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
1775 Long sequence = orderIterator.next().getKey();
1776 addAckLocation(tx, rc, sequence, entry.getKey());
1777 }
1778 // modify so it is upgraded
1779 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
1780 }
1781 }
1782
1783 // Configure the message references index
1784 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
1785 while (subscriptions.hasNext()) {
1786 Entry<String, SequenceSet> subscription = subscriptions.next();
1787 SequenceSet pendingAcks = subscription.getValue();
1788 if (pendingAcks != null && !pendingAcks.isEmpty()) {
1789 Long lastPendingAck = pendingAcks.getTail().getLast();
1790 for(Long sequenceId : pendingAcks) {
1791 Long current = rc.messageReferences.get(sequenceId);
1792 if (current == null) {
1793 current = new Long(0);
1794 }
1795
1796 // We always add a trailing empty entry for the next position to start from
1797 // so we need to ensure we don't count that as a message reference on reload.
1798 if (!sequenceId.equals(lastPendingAck)) {
1799 current = current.longValue() + 1;
1800 }
1801
1802 rc.messageReferences.put(sequenceId, current);
1803 }
1804 }
1805 }
1806
1807 // Configure the subscription cache
1808 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1809 Entry<String, LastAck> entry = iterator.next();
1810 rc.subscriptionCache.add(entry.getKey());
1811 }
1812
1813 if (rc.orderIndex.nextMessageId == 0) {
1814 // check for existing durable sub all acked out - pull next seq from acks as messages are gone
1815 if (!rc.subscriptionAcks.isEmpty(tx)) {
1816 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1817 Entry<String, LastAck> entry = iterator.next();
1818 rc.orderIndex.nextMessageId =
1819 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
1820 }
1821 }
1822 } else {
1823 // update based on ackPositions for unmatched, last entry is always the next
1824 if (!rc.messageReferences.isEmpty()) {
1825 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
1826 rc.orderIndex.nextMessageId =
1827 Math.max(rc.orderIndex.nextMessageId, nextMessageId);
1828 }
1829 }
1830 }
1831
1832 if (metadata.version < VERSION) {
1833 // store again after upgrade
1834 metadata.destinations.put(tx, key, rc);
1835 }
1836 return rc;
1837 }
1838
1839 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1840 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1841 if (sequences == null) {
1842 sequences = new SequenceSet();
1843 sequences.add(messageSequence);
1844 sd.ackPositions.add(tx, subscriptionKey, sequences);
1845 } else {
1846 sequences.add(messageSequence);
1847 sd.ackPositions.put(tx, subscriptionKey, sequences);
1848 }
1849
1850 Long count = sd.messageReferences.get(messageSequence);
1851 if (count == null) {
1852 count = Long.valueOf(0L);
1853 }
1854 count = count.longValue() + 1;
1855 sd.messageReferences.put(messageSequence, count);
1856 }
1857
1858 // new sub is interested in potentially all existing messages
1859 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1860 SequenceSet allOutstanding = new SequenceSet();
1861 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
1862 while (iterator.hasNext()) {
1863 SequenceSet set = iterator.next().getValue();
1864 for (Long entry : set) {
1865 allOutstanding.add(entry);
1866 }
1867 }
1868 sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
1869
1870 for (Long ackPosition : allOutstanding) {
1871 Long count = sd.messageReferences.get(ackPosition);
1872 count = count.longValue() + 1;
1873 sd.messageReferences.put(ackPosition, count);
1874 }
1875 }
1876
1877 // on a new message add, all existing subs are interested in this message
1878 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
1879 for(String subscriptionKey : sd.subscriptionCache) {
1880 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1881 if (sequences == null) {
1882 sequences = new SequenceSet();
1883 sequences.add(new Sequence(messageSequence, messageSequence + 1));
1884 sd.ackPositions.add(tx, subscriptionKey, sequences);
1885 } else {
1886 sequences.add(new Sequence(messageSequence, messageSequence + 1));
1887 sd.ackPositions.put(tx, subscriptionKey, sequences);
1888 }
1889
1890 Long count = sd.messageReferences.get(messageSequence);
1891 if (count == null) {
1892 count = Long.valueOf(0L);
1893 }
1894 count = count.longValue() + 1;
1895 sd.messageReferences.put(messageSequence, count);
1896 sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
1897 }
1898 }
1899
1900 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1901 if (!sd.ackPositions.isEmpty(tx)) {
1902 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
1903 if (sequences == null || sequences.isEmpty()) {
1904 return;
1905 }
1906
1907 ArrayList<Long> unreferenced = new ArrayList<Long>();
1908
1909 for(Long sequenceId : sequences) {
1910 Long references = sd.messageReferences.get(sequenceId);
1911 if (references != null) {
1912 references = references.longValue() - 1;
1913
1914 if (references.longValue() > 0) {
1915 sd.messageReferences.put(sequenceId, references);
1916 } else {
1917 sd.messageReferences.remove(sequenceId);
1918 unreferenced.add(sequenceId);
1919 }
1920 }
1921 }
1922
1923 for(Long sequenceId : unreferenced) {
1924 // Find all the entries that need to get deleted.
1925 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1926 sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
1927
1928 // Do the actual deletes.
1929 for (Entry<Long, MessageKeys> entry : deletes) {
1930 sd.locationIndex.remove(tx, entry.getValue().location);
1931 sd.messageIdIndex.remove(tx, entry.getValue().messageId);
1932 sd.orderIndex.remove(tx, entry.getKey());
1933 }
1934 }
1935 }
1936 }
1937
1938 /**
1939 * @param tx
1940 * @param sd
1941 * @param subscriptionKey
1942 * @param messageSequence
1943 * @throws IOException
1944 */
1945 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
1946 // Remove the sub from the previous location set..
1947 if (messageSequence != null) {
1948 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
1949 if (range != null && !range.isEmpty()) {
1950 range.remove(messageSequence);
1951 if (!range.isEmpty()) {
1952 sd.ackPositions.put(tx, subscriptionKey, range);
1953 } else {
1954 sd.ackPositions.remove(tx, subscriptionKey);
1955 }
1956
1957 // Check if the message is reference by any other subscription.
1958 Long count = sd.messageReferences.get(messageSequence);
1959 if (count != null){
1960 long references = count.longValue() - 1;
1961 if (references > 0) {
1962 sd.messageReferences.put(messageSequence, Long.valueOf(references));
1963 return;
1964 } else {
1965 sd.messageReferences.remove(messageSequence);
1966 }
1967 }
1968
1969 // Find all the entries that need to get deleted.
1970 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1971 sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
1972
1973 // Do the actual deletes.
1974 for (Entry<Long, MessageKeys> entry : deletes) {
1975 sd.locationIndex.remove(tx, entry.getValue().location);
1976 sd.messageIdIndex.remove(tx, entry.getValue().messageId);
1977 sd.orderIndex.remove(tx, entry.getKey());
1978 }
1979 }
1980 }
1981 }
1982
1983 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1984 return sd.subscriptionAcks.get(tx, subscriptionKey);
1985 }
1986
1987 public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1988 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
1989 if (messageSequences != null) {
1990 long result = messageSequences.rangeSize();
1991 // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
1992 return result > 0 ? result - 1 : 0;
1993 }
1994
1995 return 0;
1996 }
1997
1998 private String key(KahaDestination destination) {
1999 return destination.getType().getNumber() + ":" + destination.getName();
2000 }
2001
2002 // /////////////////////////////////////////////////////////////////
2003 // Transaction related implementation methods.
2004 // /////////////////////////////////////////////////////////////////
2005 @SuppressWarnings("rawtypes")
2006 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2007 @SuppressWarnings("rawtypes")
2008 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2009 protected final Set<String> ackedAndPrepared = new HashSet<String>();
2010
2011 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2012 // till then they are skipped by the store.
2013 // 'at most once' XA guarantee
2014 public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2015 this.indexLock.writeLock().lock();
2016 try {
2017 for (MessageAck ack : acks) {
2018 ackedAndPrepared.add(ack.getLastMessageId().toString());
2019 }
2020 } finally {
2021 this.indexLock.writeLock().unlock();
2022 }
2023 }
2024
2025 public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
2026 if (acks != null) {
2027 this.indexLock.writeLock().lock();
2028 try {
2029 for (MessageAck ack : acks) {
2030 ackedAndPrepared.remove(ack.getLastMessageId().toString());
2031 }
2032 } finally {
2033 this.indexLock.writeLock().unlock();
2034 }
2035 }
2036 }
2037
2038 @SuppressWarnings("rawtypes")
2039 private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
2040 TransactionId key = TransactionIdConversion.convert(info);
2041 List<Operation> tx;
2042 synchronized (inflightTransactions) {
2043 tx = inflightTransactions.get(key);
2044 if (tx == null) {
2045 tx = Collections.synchronizedList(new ArrayList<Operation>());
2046 inflightTransactions.put(key, tx);
2047 }
2048 }
2049 return tx;
2050 }
2051
2052 @SuppressWarnings("unused")
2053 private TransactionId key(KahaTransactionInfo transactionInfo) {
2054 return TransactionIdConversion.convert(transactionInfo);
2055 }
2056
2057 abstract class Operation <T extends JournalCommand<T>> {
2058 final T command;
2059 final Location location;
2060
2061 public Operation(T command, Location location) {
2062 this.command = command;
2063 this.location = location;
2064 }
2065
2066 public Location getLocation() {
2067 return location;
2068 }
2069
2070 public T getCommand() {
2071 return command;
2072 }
2073
2074 abstract public void execute(Transaction tx) throws IOException;
2075 }
2076
2077 class AddOpperation extends Operation<KahaAddMessageCommand> {
2078
2079 public AddOpperation(KahaAddMessageCommand command, Location location) {
2080 super(command, location);
2081 }
2082
2083 @Override
2084 public void execute(Transaction tx) throws IOException {
2085 upadateIndex(tx, command, location);
2086 }
2087
2088 }
2089
2090 class RemoveOpperation extends Operation<KahaRemoveMessageCommand> {
2091
2092 public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
2093 super(command, location);
2094 }
2095
2096 @Override
2097 public void execute(Transaction tx) throws IOException {
2098 updateIndex(tx, command, location);
2099 }
2100 }
2101
2102 // /////////////////////////////////////////////////////////////////
2103 // Initialization related implementation methods.
2104 // /////////////////////////////////////////////////////////////////
2105
2106 private PageFile createPageFile() {
2107 PageFile index = new PageFile(directory, "db");
2108 index.setEnableWriteThread(isEnableIndexWriteAsync());
2109 index.setWriteBatchSize(getIndexWriteBatchSize());
2110 index.setPageCacheSize(indexCacheSize);
2111 index.setUseLFRUEviction(isUseIndexLFRUEviction());
2112 index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2113 index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2114 index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2115 index.setEnablePageCaching(isEnableIndexPageCaching());
2116 return index;
2117 }
2118
2119 private Journal createJournal() throws IOException {
2120 Journal manager = new Journal();
2121 manager.setDirectory(directory);
2122 manager.setMaxFileLength(getJournalMaxFileLength());
2123 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2124 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2125 manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2126 manager.setArchiveDataLogs(isArchiveDataLogs());
2127 manager.setSizeAccumulator(journalSize);
2128 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2129 if (getDirectoryArchive() != null) {
2130 IOHelper.mkdirs(getDirectoryArchive());
2131 manager.setDirectoryArchive(getDirectoryArchive());
2132 }
2133 return manager;
2134 }
2135
2136 public int getJournalMaxWriteBatchSize() {
2137 return journalMaxWriteBatchSize;
2138 }
2139
2140 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2141 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2142 }
2143
2144 public File getDirectory() {
2145 return directory;
2146 }
2147
2148 public void setDirectory(File directory) {
2149 this.directory = directory;
2150 }
2151
2152 public boolean isDeleteAllMessages() {
2153 return deleteAllMessages;
2154 }
2155
2156 public void setDeleteAllMessages(boolean deleteAllMessages) {
2157 this.deleteAllMessages = deleteAllMessages;
2158 }
2159
2160 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2161 this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2162 }
2163
2164 public int getIndexWriteBatchSize() {
2165 return setIndexWriteBatchSize;
2166 }
2167
2168 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2169 this.enableIndexWriteAsync = enableIndexWriteAsync;
2170 }
2171
2172 boolean isEnableIndexWriteAsync() {
2173 return enableIndexWriteAsync;
2174 }
2175
2176 public boolean isEnableJournalDiskSyncs() {
2177 return enableJournalDiskSyncs;
2178 }
2179
2180 public void setEnableJournalDiskSyncs(boolean syncWrites) {
2181 this.enableJournalDiskSyncs = syncWrites;
2182 }
2183
2184 public long getCheckpointInterval() {
2185 return checkpointInterval;
2186 }
2187
2188 public void setCheckpointInterval(long checkpointInterval) {
2189 this.checkpointInterval = checkpointInterval;
2190 }
2191
2192 public long getCleanupInterval() {
2193 return cleanupInterval;
2194 }
2195
2196 public void setCleanupInterval(long cleanupInterval) {
2197 this.cleanupInterval = cleanupInterval;
2198 }
2199
2200 public void setJournalMaxFileLength(int journalMaxFileLength) {
2201 this.journalMaxFileLength = journalMaxFileLength;
2202 }
2203
2204 public int getJournalMaxFileLength() {
2205 return journalMaxFileLength;
2206 }
2207
2208 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
2209 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
2210 }
2211
2212 public int getMaxFailoverProducersToTrack() {
2213 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
2214 }
2215
2216 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
2217 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
2218 }
2219
2220 public int getFailoverProducersAuditDepth() {
2221 return this.metadata.producerSequenceIdTracker.getAuditDepth();
2222 }
2223
2224 public PageFile getPageFile() {
2225 if (pageFile == null) {
2226 pageFile = createPageFile();
2227 }
2228 return pageFile;
2229 }
2230
2231 public Journal getJournal() throws IOException {
2232 if (journal == null) {
2233 journal = createJournal();
2234 }
2235 return journal;
2236 }
2237
2238 public boolean isFailIfDatabaseIsLocked() {
2239 return failIfDatabaseIsLocked;
2240 }
2241
2242 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
2243 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
2244 }
2245
2246 public boolean isIgnoreMissingJournalfiles() {
2247 return ignoreMissingJournalfiles;
2248 }
2249
2250 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
2251 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
2252 }
2253
2254 public int getIndexCacheSize() {
2255 return indexCacheSize;
2256 }
2257
2258 public void setIndexCacheSize(int indexCacheSize) {
2259 this.indexCacheSize = indexCacheSize;
2260 }
2261
2262 public boolean isCheckForCorruptJournalFiles() {
2263 return checkForCorruptJournalFiles;
2264 }
2265
2266 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
2267 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
2268 }
2269
2270 public boolean isChecksumJournalFiles() {
2271 return checksumJournalFiles;
2272 }
2273
2274 public void setChecksumJournalFiles(boolean checksumJournalFiles) {
2275 this.checksumJournalFiles = checksumJournalFiles;
2276 }
2277
2278 public void setBrokerService(BrokerService brokerService) {
2279 this.brokerService = brokerService;
2280 }
2281
2282 /**
2283 * @return the archiveDataLogs
2284 */
2285 public boolean isArchiveDataLogs() {
2286 return this.archiveDataLogs;
2287 }
2288
2289 /**
2290 * @param archiveDataLogs the archiveDataLogs to set
2291 */
2292 public void setArchiveDataLogs(boolean archiveDataLogs) {
2293 this.archiveDataLogs = archiveDataLogs;
2294 }
2295
2296 /**
2297 * @return the directoryArchive
2298 */
2299 public File getDirectoryArchive() {
2300 return this.directoryArchive;
2301 }
2302
2303 /**
2304 * @param directoryArchive the directoryArchive to set
2305 */
2306 public void setDirectoryArchive(File directoryArchive) {
2307 this.directoryArchive = directoryArchive;
2308 }
2309
2310 public boolean isRewriteOnRedelivery() {
2311 return rewriteOnRedelivery;
2312 }
2313
2314 public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
2315 this.rewriteOnRedelivery = rewriteOnRedelivery;
2316 }
2317
2318 public boolean isArchiveCorruptedIndex() {
2319 return archiveCorruptedIndex;
2320 }
2321
2322 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
2323 this.archiveCorruptedIndex = archiveCorruptedIndex;
2324 }
2325
2326 public float getIndexLFUEvictionFactor() {
2327 return indexLFUEvictionFactor;
2328 }
2329
2330 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
2331 this.indexLFUEvictionFactor = indexLFUEvictionFactor;
2332 }
2333
2334 public boolean isUseIndexLFRUEviction() {
2335 return useIndexLFRUEviction;
2336 }
2337
2338 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
2339 this.useIndexLFRUEviction = useIndexLFRUEviction;
2340 }
2341
2342 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
2343 this.enableIndexDiskSyncs = enableIndexDiskSyncs;
2344 }
2345
2346 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
2347 this.enableIndexRecoveryFile = enableIndexRecoveryFile;
2348 }
2349
2350 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
2351 this.enableIndexPageCaching = enableIndexPageCaching;
2352 }
2353
2354 public boolean isEnableIndexDiskSyncs() {
2355 return enableIndexDiskSyncs;
2356 }
2357
2358 public boolean isEnableIndexRecoveryFile() {
2359 return enableIndexRecoveryFile;
2360 }
2361
2362 public boolean isEnableIndexPageCaching() {
2363 return enableIndexPageCaching;
2364 }
2365
2366 // /////////////////////////////////////////////////////////////////
2367 // Internal conversion methods.
2368 // /////////////////////////////////////////////////////////////////
2369
2370 class MessageOrderCursor{
2371 long defaultCursorPosition;
2372 long lowPriorityCursorPosition;
2373 long highPriorityCursorPosition;
2374 MessageOrderCursor(){
2375 }
2376
2377 MessageOrderCursor(long position){
2378 this.defaultCursorPosition=position;
2379 this.lowPriorityCursorPosition=position;
2380 this.highPriorityCursorPosition=position;
2381 }
2382
2383 MessageOrderCursor(MessageOrderCursor other){
2384 this.defaultCursorPosition=other.defaultCursorPosition;
2385 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2386 this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2387 }
2388
2389 MessageOrderCursor copy() {
2390 return new MessageOrderCursor(this);
2391 }
2392
2393 void reset() {
2394 this.defaultCursorPosition=0;
2395 this.highPriorityCursorPosition=0;
2396 this.lowPriorityCursorPosition=0;
2397 }
2398
2399 void increment() {
2400 if (defaultCursorPosition!=0) {
2401 defaultCursorPosition++;
2402 }
2403 if (highPriorityCursorPosition!=0) {
2404 highPriorityCursorPosition++;
2405 }
2406 if (lowPriorityCursorPosition!=0) {
2407 lowPriorityCursorPosition++;
2408 }
2409 }
2410
2411 public String toString() {
2412 return "MessageOrderCursor:[def:" + defaultCursorPosition
2413 + ", low:" + lowPriorityCursorPosition
2414 + ", high:" + highPriorityCursorPosition + "]";
2415 }
2416
2417 public void sync(MessageOrderCursor other) {
2418 this.defaultCursorPosition=other.defaultCursorPosition;
2419 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2420 this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2421 }
2422 }
2423
2424 class MessageOrderIndex {
2425 static final byte HI = 9;
2426 static final byte LO = 0;
2427 static final byte DEF = 4;
2428
2429 long nextMessageId;
2430 BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2431 BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2432 BTreeIndex<Long, MessageKeys> highPriorityIndex;
2433 MessageOrderCursor cursor = new MessageOrderCursor();
2434 Long lastDefaultKey;
2435 Long lastHighKey;
2436 Long lastLowKey;
2437 byte lastGetPriority;
2438
2439 MessageKeys remove(Transaction tx, Long key) throws IOException {
2440 MessageKeys result = defaultPriorityIndex.remove(tx, key);
2441 if (result == null && highPriorityIndex!=null) {
2442 result = highPriorityIndex.remove(tx, key);
2443 if (result ==null && lowPriorityIndex!=null) {
2444 result = lowPriorityIndex.remove(tx, key);
2445 }
2446 }
2447 return result;
2448 }
2449
2450 void load(Transaction tx) throws IOException {
2451 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2452 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2453 defaultPriorityIndex.load(tx);
2454 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2455 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2456 lowPriorityIndex.load(tx);
2457 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2458 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2459 highPriorityIndex.load(tx);
2460 }
2461
2462 void allocate(Transaction tx) throws IOException {
2463 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2464 if (metadata.version >= 2) {
2465 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2466 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2467 }
2468 }
2469
2470 void configureLast(Transaction tx) throws IOException {
2471 // Figure out the next key using the last entry in the destination.
2472 if (highPriorityIndex != null) {
2473 Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
2474 if (lastEntry != null) {
2475 nextMessageId = lastEntry.getKey() + 1;
2476 } else {
2477 lastEntry = defaultPriorityIndex.getLast(tx);
2478 if (lastEntry != null) {
2479 nextMessageId = lastEntry.getKey() + 1;
2480 } else {
2481 lastEntry = lowPriorityIndex.getLast(tx);
2482 if (lastEntry != null) {
2483 nextMessageId = lastEntry.getKey() + 1;
2484 }
2485 }
2486 }
2487 } else {
2488 Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
2489 if (lastEntry != null) {
2490 nextMessageId = lastEntry.getKey() + 1;
2491 }
2492 }
2493 }
2494
2495 void clear(Transaction tx) throws IOException {
2496 this.remove(tx);
2497 this.resetCursorPosition();
2498 this.allocate(tx);
2499 this.load(tx);
2500 this.configureLast(tx);
2501 }
2502
2503 void remove(Transaction tx) throws IOException {
2504 defaultPriorityIndex.clear(tx);
2505 defaultPriorityIndex.unload(tx);
2506 tx.free(defaultPriorityIndex.getPageId());
2507 if (lowPriorityIndex != null) {
2508 lowPriorityIndex.clear(tx);
2509 lowPriorityIndex.unload(tx);
2510
2511 tx.free(lowPriorityIndex.getPageId());
2512 }
2513 if (highPriorityIndex != null) {
2514 highPriorityIndex.clear(tx);
2515 highPriorityIndex.unload(tx);
2516 tx.free(highPriorityIndex.getPageId());
2517 }
2518 }
2519
2520 void resetCursorPosition() {
2521 this.cursor.reset();
2522 lastDefaultKey = null;
2523 lastHighKey = null;
2524 lastLowKey = null;
2525 }
2526
2527 void setBatch(Transaction tx, Long sequence) throws IOException {
2528 if (sequence != null) {
2529 Long nextPosition = new Long(sequence.longValue() + 1);
2530 if (defaultPriorityIndex.containsKey(tx, sequence)) {
2531 lastDefaultKey = sequence;
2532 cursor.defaultCursorPosition = nextPosition.longValue();
2533 } else if (highPriorityIndex != null) {
2534 if (highPriorityIndex.containsKey(tx, sequence)) {
2535 lastHighKey = sequence;
2536 cursor.highPriorityCursorPosition = nextPosition.longValue();
2537 } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2538 lastLowKey = sequence;
2539 cursor.lowPriorityCursorPosition = nextPosition.longValue();
2540 }
2541 } else {
2542 LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this);
2543 lastDefaultKey = sequence;
2544 cursor.defaultCursorPosition = nextPosition.longValue();
2545 }
2546 }
2547 }
2548
2549 void setBatch(Transaction tx, LastAck last) throws IOException {
2550 setBatch(tx, last.lastAckedSequence);
2551 if (cursor.defaultCursorPosition == 0
2552 && cursor.highPriorityCursorPosition == 0
2553 && cursor.lowPriorityCursorPosition == 0) {
2554 long next = last.lastAckedSequence + 1;
2555 switch (last.priority) {
2556 case DEF:
2557 cursor.defaultCursorPosition = next;
2558 cursor.highPriorityCursorPosition = next;
2559 break;
2560 case HI:
2561 cursor.highPriorityCursorPosition = next;
2562 break;
2563 case LO:
2564 cursor.lowPriorityCursorPosition = next;
2565 cursor.defaultCursorPosition = next;
2566 cursor.highPriorityCursorPosition = next;
2567 break;
2568 }
2569 }
2570 }
2571
2572 void stoppedIterating() {
2573 if (lastDefaultKey!=null) {
2574 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2575 }
2576 if (lastHighKey!=null) {
2577 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2578 }
2579 if (lastLowKey!=null) {
2580 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2581 }
2582 lastDefaultKey = null;
2583 lastHighKey = null;
2584 lastLowKey = null;
2585 }
2586
2587 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2588 throws IOException {
2589 if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2590 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2591 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2592 getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2593 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2594 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2595 }
2596 }
2597
2598 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2599 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2600
2601 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
2602 deletes.add(iterator.next());
2603 }
2604
2605 long getNextMessageId(int priority) {
2606 return nextMessageId++;
2607 }
2608
2609 MessageKeys get(Transaction tx, Long key) throws IOException {
2610 MessageKeys result = defaultPriorityIndex.get(tx, key);
2611 if (result == null) {
2612 result = highPriorityIndex.get(tx, key);
2613 if (result == null) {
2614 result = lowPriorityIndex.get(tx, key);
2615 lastGetPriority = LO;
2616 } else {
2617 lastGetPriority = HI;
2618 }
2619 } else {
2620 lastGetPriority = DEF;
2621 }
2622 return result;
2623 }
2624
2625 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2626 if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
2627 return defaultPriorityIndex.put(tx, key, value);
2628 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
2629 return highPriorityIndex.put(tx, key, value);
2630 } else {
2631 return lowPriorityIndex.put(tx, key, value);
2632 }
2633 }
2634
2635 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
2636 return new MessageOrderIterator(tx,cursor);
2637 }
2638
2639 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
2640 return new MessageOrderIterator(tx,m);
2641 }
2642
2643 public byte lastGetPriority() {
2644 return lastGetPriority;
2645 }
2646
2647 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
2648 Iterator<Entry<Long, MessageKeys>>currentIterator;
2649 final Iterator<Entry<Long, MessageKeys>>highIterator;
2650 final Iterator<Entry<Long, MessageKeys>>defaultIterator;
2651 final Iterator<Entry<Long, MessageKeys>>lowIterator;
2652
2653 MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
2654 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
2655 if (highPriorityIndex != null) {
2656 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
2657 } else {
2658 this.highIterator = null;
2659 }
2660 if (lowPriorityIndex != null) {
2661 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
2662 } else {
2663 this.lowIterator = null;
2664 }
2665 }
2666
2667 public boolean hasNext() {
2668 if (currentIterator == null) {
2669 if (highIterator != null) {
2670 if (highIterator.hasNext()) {
2671 currentIterator = highIterator;
2672 return currentIterator.hasNext();
2673 }
2674 if (defaultIterator.hasNext()) {
2675 currentIterator = defaultIterator;
2676 return currentIterator.hasNext();
2677 }
2678 if (lowIterator.hasNext()) {
2679 currentIterator = lowIterator;
2680 return currentIterator.hasNext();
2681 }
2682 return false;
2683 } else {
2684 currentIterator = defaultIterator;
2685 return currentIterator.hasNext();
2686 }
2687 }
2688 if (highIterator != null) {
2689 if (currentIterator.hasNext()) {
2690 return true;
2691 }
2692 if (currentIterator == highIterator) {
2693 if (defaultIterator.hasNext()) {
2694 currentIterator = defaultIterator;
2695 return currentIterator.hasNext();
2696 }
2697 if (lowIterator.hasNext()) {
2698 currentIterator = lowIterator;
2699 return currentIterator.hasNext();
2700 }
2701 return false;
2702 }
2703
2704 if (currentIterator == defaultIterator) {
2705 if (lowIterator.hasNext()) {
2706 currentIterator = lowIterator;
2707 return currentIterator.hasNext();
2708 }
2709 return false;
2710 }
2711 }
2712 return currentIterator.hasNext();
2713 }
2714
2715 public Entry<Long, MessageKeys> next() {
2716 Entry<Long, MessageKeys> result = currentIterator.next();
2717 if (result != null) {
2718 Long key = result.getKey();
2719 if (highIterator != null) {
2720 if (currentIterator == defaultIterator) {
2721 lastDefaultKey = key;
2722 } else if (currentIterator == highIterator) {
2723 lastHighKey = key;
2724 } else {
2725 lastLowKey = key;
2726 }
2727 } else {
2728 lastDefaultKey = key;
2729 }
2730 }
2731 return result;
2732 }
2733
2734 public void remove() {
2735 throw new UnsupportedOperationException();
2736 }
2737
2738 }
2739 }
2740
2741 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
2742 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
2743
2744 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
2745 ByteArrayOutputStream baos = new ByteArrayOutputStream();
2746 ObjectOutputStream oout = new ObjectOutputStream(baos);
2747 oout.writeObject(object);
2748 oout.flush();
2749 oout.close();
2750 byte[] data = baos.toByteArray();
2751 dataOut.writeInt(data.length);
2752 dataOut.write(data);
2753 }
2754
2755 @SuppressWarnings("unchecked")
2756 public HashSet<String> readPayload(DataInput dataIn) throws IOException {
2757 int dataLen = dataIn.readInt();
2758 byte[] data = new byte[dataLen];
2759 dataIn.readFully(data);
2760 ByteArrayInputStream bais = new ByteArrayInputStream(data);
2761 ObjectInputStream oin = new ObjectInputStream(bais);
2762 try {
2763 return (HashSet<String>) oin.readObject();
2764 } catch (ClassNotFoundException cfe) {
2765 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
2766 ioe.initCause(cfe);
2767 throw ioe;
2768 }
2769 }
2770 }
2771 }