001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.ByteArrayOutputStream;
021 import java.io.DataInput;
022 import java.io.DataOutput;
023 import java.io.EOFException;
024 import java.io.File;
025 import java.io.IOException;
026 import java.io.InputStream;
027 import java.io.InterruptedIOException;
028 import java.io.ObjectInputStream;
029 import java.io.ObjectOutputStream;
030 import java.io.OutputStream;
031 import java.util.ArrayList;
032 import java.util.Arrays;
033 import java.util.Collection;
034 import java.util.Collections;
035 import java.util.Date;
036 import java.util.HashMap;
037 import java.util.HashSet;
038 import java.util.Iterator;
039 import java.util.LinkedHashMap;
040 import java.util.LinkedHashSet;
041 import java.util.List;
042 import java.util.Map;
043 import java.util.Map.Entry;
044 import java.util.Set;
045 import java.util.SortedSet;
046 import java.util.Stack;
047 import java.util.TreeMap;
048 import java.util.TreeSet;
049 import java.util.concurrent.atomic.AtomicBoolean;
050 import java.util.concurrent.atomic.AtomicLong;
051 import java.util.concurrent.locks.ReentrantReadWriteLock;
052
053 import org.apache.activemq.ActiveMQMessageAuditNoSync;
054 import org.apache.activemq.broker.BrokerService;
055 import org.apache.activemq.broker.BrokerServiceAware;
056 import org.apache.activemq.command.MessageAck;
057 import org.apache.activemq.command.SubscriptionInfo;
058 import org.apache.activemq.command.TransactionId;
059 import org.apache.activemq.protobuf.Buffer;
060 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
061 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
062 import org.apache.activemq.store.kahadb.data.KahaDestination;
063 import org.apache.activemq.store.kahadb.data.KahaEntryType;
064 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
065 import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
066 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
067 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
068 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
069 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
070 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
071 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
072 import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
073 import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
074 import org.apache.activemq.store.kahadb.disk.index.ListIndex;
075 import org.apache.activemq.store.kahadb.disk.journal.DataFile;
076 import org.apache.activemq.store.kahadb.disk.journal.Journal;
077 import org.apache.activemq.store.kahadb.disk.journal.Location;
078 import org.apache.activemq.store.kahadb.disk.page.Page;
079 import org.apache.activemq.store.kahadb.disk.page.PageFile;
080 import org.apache.activemq.store.kahadb.disk.page.Transaction;
081 import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
082 import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
083 import org.apache.activemq.store.kahadb.disk.util.Marshaller;
084 import org.apache.activemq.store.kahadb.disk.util.Sequence;
085 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
086 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
087 import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
088 import org.apache.activemq.util.ByteSequence;
089 import org.apache.activemq.util.Callback;
090 import org.apache.activemq.util.DataByteArrayInputStream;
091 import org.apache.activemq.util.DataByteArrayOutputStream;
092 import org.apache.activemq.util.IOHelper;
093 import org.apache.activemq.util.ServiceStopper;
094 import org.apache.activemq.util.ServiceSupport;
095 import org.slf4j.Logger;
096 import org.slf4j.LoggerFactory;
097
098 public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
099
100 protected BrokerService brokerService;
101
102 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
103 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
104 public static final File DEFAULT_DIRECTORY = new File("KahaDB");
105 protected static final Buffer UNMATCHED;
106 static {
107 UNMATCHED = new Buffer(new byte[]{});
108 }
109 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
110 private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
111
112 static final int CLOSED_STATE = 1;
113 static final int OPEN_STATE = 2;
114 static final long NOT_ACKED = -1;
115
116 static final int VERSION = 4;
117
118 protected class Metadata {
119 protected Page<Metadata> page;
120 protected int state;
121 protected BTreeIndex<String, StoredDestination> destinations;
122 protected Location lastUpdate;
123 protected Location firstInProgressTransactionLocation;
124 protected Location producerSequenceIdTrackerLocation = null;
125 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
126 protected int version = VERSION;
127 public void read(DataInput is) throws IOException {
128 state = is.readInt();
129 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
130 if (is.readBoolean()) {
131 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
132 } else {
133 lastUpdate = null;
134 }
135 if (is.readBoolean()) {
136 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
137 } else {
138 firstInProgressTransactionLocation = null;
139 }
140 try {
141 if (is.readBoolean()) {
142 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
143 } else {
144 producerSequenceIdTrackerLocation = null;
145 }
146 } catch (EOFException expectedOnUpgrade) {
147 }
148 try {
149 version = is.readInt();
150 } catch (EOFException expectedOnUpgrade) {
151 version=1;
152 }
153 LOG.info("KahaDB is version " + version);
154 }
155
156 public void write(DataOutput os) throws IOException {
157 os.writeInt(state);
158 os.writeLong(destinations.getPageId());
159
160 if (lastUpdate != null) {
161 os.writeBoolean(true);
162 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
163 } else {
164 os.writeBoolean(false);
165 }
166
167 if (firstInProgressTransactionLocation != null) {
168 os.writeBoolean(true);
169 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
170 } else {
171 os.writeBoolean(false);
172 }
173
174 if (producerSequenceIdTrackerLocation != null) {
175 os.writeBoolean(true);
176 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
177 } else {
178 os.writeBoolean(false);
179 }
180 os.writeInt(VERSION);
181 }
182 }
183
184 class MetadataMarshaller extends VariableMarshaller<Metadata> {
185 @Override
186 public Metadata readPayload(DataInput dataIn) throws IOException {
187 Metadata rc = new Metadata();
188 rc.read(dataIn);
189 return rc;
190 }
191
192 @Override
193 public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
194 object.write(dataOut);
195 }
196 }
197
198 protected PageFile pageFile;
199 protected Journal journal;
200 protected Metadata metadata = new Metadata();
201
202 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
203
204 protected boolean failIfDatabaseIsLocked;
205
206 protected boolean deleteAllMessages;
207 protected File directory = DEFAULT_DIRECTORY;
208 protected Thread checkpointThread;
209 protected boolean enableJournalDiskSyncs=true;
210 protected boolean archiveDataLogs;
211 protected File directoryArchive;
212 protected AtomicLong journalSize = new AtomicLong(0);
213 long checkpointInterval = 5*1000;
214 long cleanupInterval = 30*1000;
215 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
216 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
217 boolean enableIndexWriteAsync = false;
218 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
219
220 protected AtomicBoolean opened = new AtomicBoolean();
221 private boolean ignoreMissingJournalfiles = false;
222 private int indexCacheSize = 10000;
223 private boolean checkForCorruptJournalFiles = false;
224 private boolean checksumJournalFiles = false;
225 protected boolean forceRecoverIndex = false;
226 private final Object checkpointThreadLock = new Object();
227 private boolean rewriteOnRedelivery = false;
228 private boolean archiveCorruptedIndex = false;
229 private boolean useIndexLFRUEviction = false;
230 private float indexLFUEvictionFactor = 0.2f;
231 private boolean enableIndexDiskSyncs = true;
232 private boolean enableIndexRecoveryFile = true;
233 private boolean enableIndexPageCaching = true;
234
235 public MessageDatabase() {
236 }
237
238 @Override
239 public void doStart() throws Exception {
240 load();
241 }
242
243 @Override
244 public void doStop(ServiceStopper stopper) throws Exception {
245 unload();
246 }
247
248 private void loadPageFile() throws IOException {
249 this.indexLock.writeLock().lock();
250 try {
251 final PageFile pageFile = getPageFile();
252 pageFile.load();
253 pageFile.tx().execute(new Transaction.Closure<IOException>() {
254 @Override
255 public void execute(Transaction tx) throws IOException {
256 if (pageFile.getPageCount() == 0) {
257 // First time this is created.. Initialize the metadata
258 Page<Metadata> page = tx.allocate();
259 assert page.getPageId() == 0;
260 page.set(metadata);
261 metadata.page = page;
262 metadata.state = CLOSED_STATE;
263 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
264
265 tx.store(metadata.page, metadataMarshaller, true);
266 } else {
267 Page<Metadata> page = tx.load(0, metadataMarshaller);
268 metadata = page.get();
269 metadata.page = page;
270 }
271 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
272 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
273 metadata.destinations.load(tx);
274 }
275 });
276 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
277 // Perhaps we should just keep an index of file
278 storedDestinations.clear();
279 pageFile.tx().execute(new Transaction.Closure<IOException>() {
280 @Override
281 public void execute(Transaction tx) throws IOException {
282 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
283 Entry<String, StoredDestination> entry = iterator.next();
284 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
285 storedDestinations.put(entry.getKey(), sd);
286 }
287 }
288 });
289 pageFile.flush();
290 } finally {
291 this.indexLock.writeLock().unlock();
292 }
293 }
294
295 private void startCheckpoint() {
296 if (checkpointInterval == 0 && cleanupInterval == 0) {
297 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
298 return;
299 }
300 synchronized (checkpointThreadLock) {
301 boolean start = false;
302 if (checkpointThread == null) {
303 start = true;
304 } else if (!checkpointThread.isAlive()) {
305 start = true;
306 LOG.info("KahaDB: Recovering checkpoint thread after death");
307 }
308 if (start) {
309 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
310 @Override
311 public void run() {
312 try {
313 long lastCleanup = System.currentTimeMillis();
314 long lastCheckpoint = System.currentTimeMillis();
315 // Sleep for a short time so we can periodically check
316 // to see if we need to exit this thread.
317 long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
318 while (opened.get()) {
319 Thread.sleep(sleepTime);
320 long now = System.currentTimeMillis();
321 if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
322 checkpointCleanup(true);
323 lastCleanup = now;
324 lastCheckpoint = now;
325 } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
326 checkpointCleanup(false);
327 lastCheckpoint = now;
328 }
329 }
330 } catch (InterruptedException e) {
331 // Looks like someone really wants us to exit this thread...
332 } catch (IOException ioe) {
333 LOG.error("Checkpoint failed", ioe);
334 brokerService.handleIOException(ioe);
335 }
336 }
337 };
338
339 checkpointThread.setDaemon(true);
340 checkpointThread.start();
341 }
342 }
343 }
344
345 public void open() throws IOException {
346 if( opened.compareAndSet(false, true) ) {
347 getJournal().start();
348 try {
349 loadPageFile();
350 } catch (Throwable t) {
351 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
352 if (LOG.isDebugEnabled()) {
353 LOG.debug("Index load failure", t);
354 }
355 // try to recover index
356 try {
357 pageFile.unload();
358 } catch (Exception ignore) {}
359 if (archiveCorruptedIndex) {
360 pageFile.archive();
361 } else {
362 pageFile.delete();
363 }
364 metadata = new Metadata();
365 pageFile = null;
366 loadPageFile();
367 }
368 startCheckpoint();
369 recover();
370 }
371 }
372
373 public void load() throws IOException {
374 this.indexLock.writeLock().lock();
375 IOHelper.mkdirs(directory);
376 try {
377 if (deleteAllMessages) {
378 getJournal().start();
379 getJournal().delete();
380 getJournal().close();
381 journal = null;
382 getPageFile().delete();
383 LOG.info("Persistence store purged.");
384 deleteAllMessages = false;
385 }
386
387 open();
388 store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
389 } finally {
390 this.indexLock.writeLock().unlock();
391 }
392 }
393
394 public void close() throws IOException, InterruptedException {
395 if( opened.compareAndSet(true, false)) {
396 this.indexLock.writeLock().lock();
397 try {
398 if (metadata.page != null) {
399 pageFile.tx().execute(new Transaction.Closure<IOException>() {
400 @Override
401 public void execute(Transaction tx) throws IOException {
402 checkpointUpdate(tx, true);
403 }
404 });
405 }
406 pageFile.unload();
407 metadata = new Metadata();
408 } finally {
409 this.indexLock.writeLock().unlock();
410 }
411 journal.close();
412 synchronized (checkpointThreadLock) {
413 if (checkpointThread != null) {
414 checkpointThread.join();
415 }
416 }
417 }
418 }
419
420 public void unload() throws IOException, InterruptedException {
421 this.indexLock.writeLock().lock();
422 try {
423 if( pageFile != null && pageFile.isLoaded() ) {
424 metadata.state = CLOSED_STATE;
425 metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
426
427 if (metadata.page != null) {
428 pageFile.tx().execute(new Transaction.Closure<IOException>() {
429 @Override
430 public void execute(Transaction tx) throws IOException {
431 tx.store(metadata.page, metadataMarshaller, true);
432 }
433 });
434 }
435 }
436 } finally {
437 this.indexLock.writeLock().unlock();
438 }
439 close();
440 }
441
442 // public for testing
443 @SuppressWarnings("rawtypes")
444 public Location[] getInProgressTxLocationRange() {
445 Location[] range = new Location[]{null, null};
446 synchronized (inflightTransactions) {
447 if (!inflightTransactions.isEmpty()) {
448 for (List<Operation> ops : inflightTransactions.values()) {
449 if (!ops.isEmpty()) {
450 trackMaxAndMin(range, ops);
451 }
452 }
453 }
454 if (!preparedTransactions.isEmpty()) {
455 for (List<Operation> ops : preparedTransactions.values()) {
456 if (!ops.isEmpty()) {
457 trackMaxAndMin(range, ops);
458 }
459 }
460 }
461 }
462 return range;
463 }
464
465 private void trackMaxAndMin(Location[] range, List<Operation> ops) {
466 Location t = ops.get(0).getLocation();
467 if (range[0]==null || t.compareTo(range[0]) <= 0) {
468 range[0] = t;
469 }
470 t = ops.get(ops.size() -1).getLocation();
471 if (range[1]==null || t.compareTo(range[1]) >= 0) {
472 range[1] = t;
473 }
474 }
475
476 class TranInfo {
477 TransactionId id;
478 Location location;
479
480 class opCount {
481 int add;
482 int remove;
483 }
484 HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
485
486 public void track(Operation operation) {
487 if (location == null ) {
488 location = operation.getLocation();
489 }
490 KahaDestination destination;
491 boolean isAdd = false;
492 if (operation instanceof AddOpperation) {
493 AddOpperation add = (AddOpperation) operation;
494 destination = add.getCommand().getDestination();
495 isAdd = true;
496 } else {
497 RemoveOpperation removeOpperation = (RemoveOpperation) operation;
498 destination = removeOpperation.getCommand().getDestination();
499 }
500 opCount opCount = destinationOpCount.get(destination);
501 if (opCount == null) {
502 opCount = new opCount();
503 destinationOpCount.put(destination, opCount);
504 }
505 if (isAdd) {
506 opCount.add++;
507 } else {
508 opCount.remove++;
509 }
510 }
511
512 @Override
513 public String toString() {
514 StringBuffer buffer = new StringBuffer();
515 buffer.append(location).append(";").append(id).append(";\n");
516 for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
517 buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
518 }
519 return buffer.toString();
520 }
521 }
522
523 @SuppressWarnings("rawtypes")
524 public String getTransactions() {
525
526 ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
527 synchronized (inflightTransactions) {
528 if (!inflightTransactions.isEmpty()) {
529 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
530 TranInfo info = new TranInfo();
531 info.id = entry.getKey();
532 for (Operation operation : entry.getValue()) {
533 info.track(operation);
534 }
535 infos.add(info);
536 }
537 }
538 }
539 return infos.toString();
540 }
541
542 /**
543 * Move all the messages that were in the journal into long term storage. We
544 * just replay and do a checkpoint.
545 *
546 * @throws IOException
547 * @throws IOException
548 * @throws IllegalStateException
549 */
550 private void recover() throws IllegalStateException, IOException {
551 this.indexLock.writeLock().lock();
552 try {
553
554 long start = System.currentTimeMillis();
555 Location producerAuditPosition = recoverProducerAudit();
556 Location lastIndoubtPosition = getRecoveryPosition();
557
558 Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
559
560 if (recoveryPosition != null) {
561 int redoCounter = 0;
562 LOG.info("Recovering from the journal ...");
563 while (recoveryPosition != null) {
564 JournalCommand<?> message = load(recoveryPosition);
565 metadata.lastUpdate = recoveryPosition;
566 process(message, recoveryPosition, lastIndoubtPosition);
567 redoCounter++;
568 recoveryPosition = journal.getNextLocation(recoveryPosition);
569 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
570 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
571 }
572 }
573 if (LOG.isInfoEnabled()) {
574 long end = System.currentTimeMillis();
575 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
576 }
577 }
578
579 // We may have to undo some index updates.
580 pageFile.tx().execute(new Transaction.Closure<IOException>() {
581 @Override
582 public void execute(Transaction tx) throws IOException {
583 recoverIndex(tx);
584 }
585 });
586
587 // rollback any recovered inflight local transactions
588 Set<TransactionId> toRollback = new HashSet<TransactionId>();
589 synchronized (inflightTransactions) {
590 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
591 TransactionId id = it.next();
592 if (id.isLocalTransaction()) {
593 toRollback.add(id);
594 }
595 }
596 for (TransactionId tx: toRollback) {
597 if (LOG.isDebugEnabled()) {
598 LOG.debug("rolling back recovered indoubt local transaction " + tx);
599 }
600 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
601 }
602 }
603 } finally {
604 this.indexLock.writeLock().unlock();
605 }
606 }
607
608 @SuppressWarnings("unused")
609 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
610 return TransactionIdConversion.convertToLocal(tx);
611 }
612
613 private Location minimum(Location producerAuditPosition,
614 Location lastIndoubtPosition) {
615 Location min = null;
616 if (producerAuditPosition != null) {
617 min = producerAuditPosition;
618 if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
619 min = lastIndoubtPosition;
620 }
621 } else {
622 min = lastIndoubtPosition;
623 }
624 return min;
625 }
626
627 private Location recoverProducerAudit() throws IOException {
628 if (metadata.producerSequenceIdTrackerLocation != null) {
629 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
630 try {
631 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
632 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
633 return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
634 } catch (Exception e) {
635 LOG.warn("Cannot recover message audit", e);
636 return journal.getNextLocation(null);
637 }
638 } else {
639 // got no audit stored so got to recreate via replay from start of the journal
640 return journal.getNextLocation(null);
641 }
642 }
643
644 protected void recoverIndex(Transaction tx) throws IOException {
645 long start = System.currentTimeMillis();
646 // It is possible index updates got applied before the journal updates..
647 // in that case we need to removed references to messages that are not in the journal
648 final Location lastAppendLocation = journal.getLastAppendLocation();
649 long undoCounter=0;
650
651 // Go through all the destinations to see if they have messages past the lastAppendLocation
652 for (StoredDestination sd : storedDestinations.values()) {
653
654 final ArrayList<Long> matches = new ArrayList<Long>();
655 // Find all the Locations that are >= than the last Append Location.
656 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
657 @Override
658 protected void matched(Location key, Long value) {
659 matches.add(value);
660 }
661 });
662
663 for (Long sequenceId : matches) {
664 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
665 sd.locationIndex.remove(tx, keys.location);
666 sd.messageIdIndex.remove(tx, keys.messageId);
667 metadata.producerSequenceIdTracker.rollback(keys.messageId);
668 undoCounter++;
669 // TODO: do we need to modify the ack positions for the pub sub case?
670 }
671 }
672
673 if( undoCounter > 0 ) {
674 // The rolledback operations are basically in flight journal writes. To avoid getting
675 // these the end user should do sync writes to the journal.
676 if (LOG.isInfoEnabled()) {
677 long end = System.currentTimeMillis();
678 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
679 }
680 }
681
682 undoCounter = 0;
683 start = System.currentTimeMillis();
684
685 // Lets be extra paranoid here and verify that all the datafiles being referenced
686 // by the indexes still exists.
687
688 final SequenceSet ss = new SequenceSet();
689 for (StoredDestination sd : storedDestinations.values()) {
690 // Use a visitor to cut down the number of pages that we load
691 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
692 int last=-1;
693
694 @Override
695 public boolean isInterestedInKeysBetween(Location first, Location second) {
696 if( first==null ) {
697 return !ss.contains(0, second.getDataFileId());
698 } else if( second==null ) {
699 return true;
700 } else {
701 return !ss.contains(first.getDataFileId(), second.getDataFileId());
702 }
703 }
704
705 @Override
706 public void visit(List<Location> keys, List<Long> values) {
707 for (Location l : keys) {
708 int fileId = l.getDataFileId();
709 if( last != fileId ) {
710 ss.add(fileId);
711 last = fileId;
712 }
713 }
714 }
715
716 });
717 }
718 HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
719 while (!ss.isEmpty()) {
720 missingJournalFiles.add((int) ss.removeFirst());
721 }
722 missingJournalFiles.removeAll(journal.getFileMap().keySet());
723
724 if (!missingJournalFiles.isEmpty()) {
725 if (LOG.isInfoEnabled()) {
726 LOG.info("Some journal files are missing: " + missingJournalFiles);
727 }
728 }
729
730 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
731 for (Integer missing : missingJournalFiles) {
732 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
733 }
734
735 if (checkForCorruptJournalFiles) {
736 Collection<DataFile> dataFiles = journal.getFileMap().values();
737 for (DataFile dataFile : dataFiles) {
738 int id = dataFile.getDataFileId();
739 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
740 Sequence seq = dataFile.getCorruptedBlocks().getHead();
741 while (seq != null) {
742 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
743 seq = seq.getNext();
744 }
745 }
746 }
747
748 if (!missingPredicates.isEmpty()) {
749 for (StoredDestination sd : storedDestinations.values()) {
750
751 final ArrayList<Long> matches = new ArrayList<Long>();
752 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
753 @Override
754 protected void matched(Location key, Long value) {
755 matches.add(value);
756 }
757 });
758
759 // If somes message references are affected by the missing data files...
760 if (!matches.isEmpty()) {
761
762 // We either 'gracefully' recover dropping the missing messages or
763 // we error out.
764 if( ignoreMissingJournalfiles ) {
765 // Update the index to remove the references to the missing data
766 for (Long sequenceId : matches) {
767 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
768 sd.locationIndex.remove(tx, keys.location);
769 sd.messageIdIndex.remove(tx, keys.messageId);
770 undoCounter++;
771 // TODO: do we need to modify the ack positions for the pub sub case?
772 }
773
774 } else {
775 throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
776 }
777 }
778 }
779 }
780
781 if( undoCounter > 0 ) {
782 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user
783 // should do sync writes to the journal.
784 if (LOG.isInfoEnabled()) {
785 long end = System.currentTimeMillis();
786 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
787 }
788 }
789 }
790
791 private Location nextRecoveryPosition;
792 private Location lastRecoveryPosition;
793
794 public void incrementalRecover() throws IOException {
795 this.indexLock.writeLock().lock();
796 try {
797 if( nextRecoveryPosition == null ) {
798 if( lastRecoveryPosition==null ) {
799 nextRecoveryPosition = getRecoveryPosition();
800 } else {
801 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
802 }
803 }
804 while (nextRecoveryPosition != null) {
805 lastRecoveryPosition = nextRecoveryPosition;
806 metadata.lastUpdate = lastRecoveryPosition;
807 JournalCommand<?> message = load(lastRecoveryPosition);
808 process(message, lastRecoveryPosition, (Runnable)null);
809 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
810 }
811 } finally {
812 this.indexLock.writeLock().unlock();
813 }
814 }
815
816 public Location getLastUpdatePosition() throws IOException {
817 return metadata.lastUpdate;
818 }
819
820 private Location getRecoveryPosition() throws IOException {
821
822 if (!this.forceRecoverIndex) {
823
824 // If we need to recover the transactions..
825 if (metadata.firstInProgressTransactionLocation != null) {
826 return metadata.firstInProgressTransactionLocation;
827 }
828
829 // Perhaps there were no transactions...
830 if( metadata.lastUpdate!=null) {
831 // Start replay at the record after the last one recorded in the index file.
832 return journal.getNextLocation(metadata.lastUpdate);
833 }
834 }
835 // This loads the first position.
836 return journal.getNextLocation(null);
837 }
838
839 protected void checkpointCleanup(final boolean cleanup) throws IOException {
840 long start;
841 this.indexLock.writeLock().lock();
842 try {
843 start = System.currentTimeMillis();
844 if( !opened.get() ) {
845 return;
846 }
847 pageFile.tx().execute(new Transaction.Closure<IOException>() {
848 @Override
849 public void execute(Transaction tx) throws IOException {
850 checkpointUpdate(tx, cleanup);
851 }
852 });
853 } finally {
854 this.indexLock.writeLock().unlock();
855 }
856
857 long end = System.currentTimeMillis();
858 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
859 if (LOG.isInfoEnabled()) {
860 LOG.info("Slow KahaDB access: cleanup took " + (end - start));
861 }
862 }
863 }
864
865 public void checkpoint(Callback closure) throws Exception {
866 this.indexLock.writeLock().lock();
867 try {
868 pageFile.tx().execute(new Transaction.Closure<IOException>() {
869 @Override
870 public void execute(Transaction tx) throws IOException {
871 checkpointUpdate(tx, false);
872 }
873 });
874 closure.execute();
875 } finally {
876 this.indexLock.writeLock().unlock();
877 }
878 }
879
880 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
881 int size = data.serializedSizeFramed();
882 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
883 os.writeByte(data.type().getNumber());
884 data.writeFramed(os);
885 return os.toByteSequence();
886 }
887
888 // /////////////////////////////////////////////////////////////////
889 // Methods call by the broker to update and query the store.
890 // /////////////////////////////////////////////////////////////////
891 public Location store(JournalCommand<?> data) throws IOException {
892 return store(data, false, null,null);
893 }
894
895 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
896 return store(data, false, null,null, onJournalStoreComplete);
897 }
898
899 public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
900 return store(data, sync, before, after, null);
901 }
902
903 /**
904 * All updated are are funneled through this method. The updates are converted
905 * to a JournalMessage which is logged to the journal and then the data from
906 * the JournalMessage is used to update the index just like it would be done
907 * during a recovery process.
908 */
909 public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException {
910 if (before != null) {
911 before.run();
912 }
913 try {
914 ByteSequence sequence = toByteSequence(data);
915 long start = System.currentTimeMillis();
916 Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
917 long start2 = System.currentTimeMillis();
918 process(data, location, after);
919 long end = System.currentTimeMillis();
920 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
921 if (LOG.isInfoEnabled()) {
922 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
923 }
924 }
925
926 if (after != null) {
927 Runnable afterCompletion = null;
928 synchronized (orderedTransactionAfters) {
929 if (!orderedTransactionAfters.empty()) {
930 afterCompletion = orderedTransactionAfters.pop();
931 }
932 }
933 if (afterCompletion != null) {
934 afterCompletion.run();
935 } else {
936 // non persistent message case
937 after.run();
938 }
939 }
940
941 if (checkpointThread != null && !checkpointThread.isAlive()) {
942 startCheckpoint();
943 }
944 return location;
945 } catch (IOException ioe) {
946 LOG.error("KahaDB failed to store to Journal", ioe);
947 brokerService.handleIOException(ioe);
948 throw ioe;
949 }
950 }
951
952 /**
953 * Loads a previously stored JournalMessage
954 *
955 * @param location
956 * @return
957 * @throws IOException
958 */
959 public JournalCommand<?> load(Location location) throws IOException {
960 long start = System.currentTimeMillis();
961 ByteSequence data = journal.read(location);
962 long end = System.currentTimeMillis();
963 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
964 if (LOG.isInfoEnabled()) {
965 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
966 }
967 }
968 DataByteArrayInputStream is = new DataByteArrayInputStream(data);
969 byte readByte = is.readByte();
970 KahaEntryType type = KahaEntryType.valueOf(readByte);
971 if( type == null ) {
972 throw new IOException("Could not load journal record. Invalid location: "+location);
973 }
974 JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
975 message.mergeFramed(is);
976 return message;
977 }
978
979 /**
980 * do minimal recovery till we reach the last inDoubtLocation
981 * @param data
982 * @param location
983 * @param inDoubtlocation
984 * @throws IOException
985 */
986 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
987 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
988 process(data, location, (Runnable) null);
989 } else {
990 // just recover producer audit
991 data.visit(new Visitor() {
992 @Override
993 public void visit(KahaAddMessageCommand command) throws IOException {
994 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
995 }
996 });
997 }
998 }
999
1000 // /////////////////////////////////////////////////////////////////
1001 // Journaled record processing methods. Once the record is journaled,
1002 // these methods handle applying the index updates. These may be called
1003 // from the recovery method too so they need to be idempotent
1004 // /////////////////////////////////////////////////////////////////
1005
1006 void process(JournalCommand<?> data, final Location location, final Runnable after) throws IOException {
1007 data.visit(new Visitor() {
1008 @Override
1009 public void visit(KahaAddMessageCommand command) throws IOException {
1010 process(command, location);
1011 }
1012
1013 @Override
1014 public void visit(KahaRemoveMessageCommand command) throws IOException {
1015 process(command, location);
1016 }
1017
1018 @Override
1019 public void visit(KahaPrepareCommand command) throws IOException {
1020 process(command, location);
1021 }
1022
1023 @Override
1024 public void visit(KahaCommitCommand command) throws IOException {
1025 process(command, location, after);
1026 }
1027
1028 @Override
1029 public void visit(KahaRollbackCommand command) throws IOException {
1030 process(command, location);
1031 }
1032
1033 @Override
1034 public void visit(KahaRemoveDestinationCommand command) throws IOException {
1035 process(command, location);
1036 }
1037
1038 @Override
1039 public void visit(KahaSubscriptionCommand command) throws IOException {
1040 process(command, location);
1041 }
1042
1043 @Override
1044 public void visit(KahaProducerAuditCommand command) throws IOException {
1045 processLocation(location);
1046 }
1047
1048 @Override
1049 public void visit(KahaTraceCommand command) {
1050 processLocation(location);
1051 }
1052 });
1053 }
1054
1055 @SuppressWarnings("rawtypes")
1056 protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
1057 if (command.hasTransactionInfo()) {
1058 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
1059 inflightTx.add(new AddOpperation(command, location));
1060 } else {
1061 this.indexLock.writeLock().lock();
1062 try {
1063 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1064 @Override
1065 public void execute(Transaction tx) throws IOException {
1066 upadateIndex(tx, command, location);
1067 }
1068 });
1069 } finally {
1070 this.indexLock.writeLock().unlock();
1071 }
1072 }
1073 }
1074
1075 @SuppressWarnings("rawtypes")
1076 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1077 if (command.hasTransactionInfo()) {
1078 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
1079 inflightTx.add(new RemoveOpperation(command, location));
1080 } else {
1081 this.indexLock.writeLock().lock();
1082 try {
1083 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1084 @Override
1085 public void execute(Transaction tx) throws IOException {
1086 updateIndex(tx, command, location);
1087 }
1088 });
1089 } finally {
1090 this.indexLock.writeLock().unlock();
1091 }
1092 }
1093 }
1094
1095 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1096 this.indexLock.writeLock().lock();
1097 try {
1098 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1099 @Override
1100 public void execute(Transaction tx) throws IOException {
1101 updateIndex(tx, command, location);
1102 }
1103 });
1104 } finally {
1105 this.indexLock.writeLock().unlock();
1106 }
1107 }
1108
1109 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1110 this.indexLock.writeLock().lock();
1111 try {
1112 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1113 @Override
1114 public void execute(Transaction tx) throws IOException {
1115 updateIndex(tx, command, location);
1116 }
1117 });
1118 } finally {
1119 this.indexLock.writeLock().unlock();
1120 }
1121 }
1122
1123 protected void processLocation(final Location location) {
1124 this.indexLock.writeLock().lock();
1125 try {
1126 metadata.lastUpdate = location;
1127 } finally {
1128 this.indexLock.writeLock().unlock();
1129 }
1130 }
1131
1132 private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>();
1133 private void push(Runnable after) {
1134 if (after != null) {
1135 synchronized (orderedTransactionAfters) {
1136 orderedTransactionAfters.push(after);
1137 }
1138 }
1139 }
1140
1141 @SuppressWarnings("rawtypes")
1142 protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
1143 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1144 List<Operation> inflightTx;
1145 synchronized (inflightTransactions) {
1146 inflightTx = inflightTransactions.remove(key);
1147 if (inflightTx == null) {
1148 inflightTx = preparedTransactions.remove(key);
1149 }
1150 }
1151 if (inflightTx == null) {
1152 if (after != null) {
1153 // since we don't push this after and we may find another, lets run it now
1154 after.run();
1155 }
1156 return;
1157 }
1158
1159 final List<Operation> messagingTx = inflightTx;
1160 this.indexLock.writeLock().lock();
1161 try {
1162 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1163 @Override
1164 public void execute(Transaction tx) throws IOException {
1165 for (Operation op : messagingTx) {
1166 op.execute(tx);
1167 }
1168 }
1169 });
1170 metadata.lastUpdate = location;
1171 push(after);
1172 } finally {
1173 this.indexLock.writeLock().unlock();
1174 }
1175 }
1176
1177 @SuppressWarnings("rawtypes")
1178 protected void process(KahaPrepareCommand command, Location location) {
1179 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1180 synchronized (inflightTransactions) {
1181 List<Operation> tx = inflightTransactions.remove(key);
1182 if (tx != null) {
1183 preparedTransactions.put(key, tx);
1184 }
1185 }
1186 }
1187
1188 @SuppressWarnings("rawtypes")
1189 protected void process(KahaRollbackCommand command, Location location) throws IOException {
1190 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1191 List<Operation> updates = null;
1192 synchronized (inflightTransactions) {
1193 updates = inflightTransactions.remove(key);
1194 if (updates == null) {
1195 updates = preparedTransactions.remove(key);
1196 }
1197 }
1198 if (isRewriteOnRedelivery()) {
1199 persistRedeliveryCount(updates);
1200 }
1201 }
1202
1203 @SuppressWarnings("rawtypes")
1204 private void persistRedeliveryCount(List<Operation> updates) throws IOException {
1205 if (updates != null) {
1206 for (Operation operation : updates) {
1207 operation.getCommand().visit(new Visitor() {
1208 @Override
1209 public void visit(KahaRemoveMessageCommand command) throws IOException {
1210 incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination());
1211 }
1212 });
1213 }
1214 }
1215 }
1216
1217 abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException;
1218
1219 // /////////////////////////////////////////////////////////////////
1220 // These methods do the actual index updates.
1221 // /////////////////////////////////////////////////////////////////
1222
1223 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1224 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1225
1226 void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1227 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1228
1229 // Skip adding the message to the index if this is a topic and there are
1230 // no subscriptions.
1231 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1232 return;
1233 }
1234
1235 // Add the message.
1236 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1237 long id = sd.orderIndex.getNextMessageId(priority);
1238 Long previous = sd.locationIndex.put(tx, location, id);
1239 if (previous == null) {
1240 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1241 if (previous == null) {
1242 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1243 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1244 addAckLocationForNewMessage(tx, sd, id);
1245 }
1246 } else {
1247 // If the message ID as indexed, then the broker asked us to
1248 // store a DUP
1249 // message. Bad BOY! Don't do it, and log a warning.
1250 LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
1251 sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1252 sd.locationIndex.remove(tx, location);
1253 rollbackStatsOnDuplicate(command.getDestination());
1254 }
1255 } else {
1256 // restore the previous value.. Looks like this was a redo of a
1257 // previously
1258 // added message. We don't want to assign it a new id as the other
1259 // indexes would
1260 // be wrong..
1261 //
1262 sd.locationIndex.put(tx, location, previous);
1263 }
1264 // record this id in any event, initial send or recovery
1265 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1266 metadata.lastUpdate = location;
1267 }
1268
1269 abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
1270
1271 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1272 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1273 if (!command.hasSubscriptionKey()) {
1274
1275 // In the queue case we just remove the message from the index..
1276 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1277 if (sequenceId != null) {
1278 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1279 if (keys != null) {
1280 sd.locationIndex.remove(tx, keys.location);
1281 recordAckMessageReferenceLocation(ackLocation, keys.location);
1282 } else if (LOG.isDebugEnabled()) {
1283 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId());
1284 }
1285 } else if (LOG.isDebugEnabled()) {
1286 LOG.debug("message not found in sequence id index: " + command.getMessageId());
1287 }
1288 } else {
1289 // In the topic case we need remove the message once it's been acked
1290 // by all the subs
1291 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1292
1293 // Make sure it's a valid message id...
1294 if (sequence != null) {
1295 String subscriptionKey = command.getSubscriptionKey();
1296 if (command.getAck() != UNMATCHED) {
1297 sd.orderIndex.get(tx, sequence);
1298 byte priority = sd.orderIndex.lastGetPriority();
1299 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1300 }
1301 // The following method handles deleting un-referenced messages.
1302 removeAckLocation(tx, sd, subscriptionKey, sequence);
1303 } else if (LOG.isDebugEnabled()) {
1304 LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1305 }
1306
1307 }
1308 metadata.lastUpdate = ackLocation;
1309 }
1310
1311 Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
1312 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1313 Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1314 if (referenceFileIds == null) {
1315 referenceFileIds = new HashSet<Integer>();
1316 referenceFileIds.add(messageLocation.getDataFileId());
1317 ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1318 } else {
1319 Integer id = Integer.valueOf(messageLocation.getDataFileId());
1320 if (!referenceFileIds.contains(id)) {
1321 referenceFileIds.add(id);
1322 }
1323 }
1324 }
1325
1326 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1327 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1328 sd.orderIndex.remove(tx);
1329
1330 sd.locationIndex.clear(tx);
1331 sd.locationIndex.unload(tx);
1332 tx.free(sd.locationIndex.getPageId());
1333
1334 sd.messageIdIndex.clear(tx);
1335 sd.messageIdIndex.unload(tx);
1336 tx.free(sd.messageIdIndex.getPageId());
1337
1338 if (sd.subscriptions != null) {
1339 sd.subscriptions.clear(tx);
1340 sd.subscriptions.unload(tx);
1341 tx.free(sd.subscriptions.getPageId());
1342
1343 sd.subscriptionAcks.clear(tx);
1344 sd.subscriptionAcks.unload(tx);
1345 tx.free(sd.subscriptionAcks.getPageId());
1346
1347 sd.ackPositions.clear(tx);
1348 sd.ackPositions.unload(tx);
1349 tx.free(sd.ackPositions.getHeadPageId());
1350 }
1351
1352 String key = key(command.getDestination());
1353 storedDestinations.remove(key);
1354 metadata.destinations.remove(tx, key);
1355 }
1356
1357 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1358 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1359 final String subscriptionKey = command.getSubscriptionKey();
1360
1361 // If set then we are creating it.. otherwise we are destroying the sub
1362 if (command.hasSubscriptionInfo()) {
1363 sd.subscriptions.put(tx, subscriptionKey, command);
1364 long ackLocation=NOT_ACKED;
1365 if (!command.getRetroactive()) {
1366 ackLocation = sd.orderIndex.nextMessageId-1;
1367 } else {
1368 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1369 }
1370 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1371 sd.subscriptionCache.add(subscriptionKey);
1372 } else {
1373 // delete the sub...
1374 sd.subscriptions.remove(tx, subscriptionKey);
1375 sd.subscriptionAcks.remove(tx, subscriptionKey);
1376 sd.subscriptionCache.remove(subscriptionKey);
1377 removeAckLocationsForSub(tx, sd, subscriptionKey);
1378
1379 if (sd.subscriptions.isEmpty(tx)) {
1380 sd.messageIdIndex.clear(tx);
1381 sd.locationIndex.clear(tx);
1382 sd.orderIndex.clear(tx);
1383 }
1384 }
1385 }
1386
1387 /**
1388 * @param tx
1389 * @throws IOException
1390 */
1391 void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1392 LOG.debug("Checkpoint started.");
1393
1394 // reflect last update exclusive of current checkpoint
1395 Location lastUpdate = metadata.lastUpdate;
1396
1397 metadata.state = OPEN_STATE;
1398 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1399 Location[] inProgressTxRange = getInProgressTxLocationRange();
1400 metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1401 tx.store(metadata.page, metadataMarshaller, true);
1402 pageFile.flush();
1403
1404 if( cleanup ) {
1405
1406 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1407 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1408
1409 if (LOG.isTraceEnabled()) {
1410 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1411 }
1412
1413 if (lastUpdate != null) {
1414 gcCandidateSet.remove(lastUpdate.getDataFileId());
1415 }
1416
1417 // Don't GC files under replication
1418 if( journalFilesBeingReplicated!=null ) {
1419 gcCandidateSet.removeAll(journalFilesBeingReplicated);
1420 }
1421
1422 if (metadata.producerSequenceIdTrackerLocation != null) {
1423 gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId());
1424 }
1425
1426 // Don't GC files referenced by in-progress tx
1427 if (inProgressTxRange[0] != null) {
1428 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1429 gcCandidateSet.remove(pendingTx);
1430 }
1431 }
1432 if (LOG.isTraceEnabled()) {
1433 LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1434 }
1435
1436 // Go through all the destinations to see if any of them can remove GC candidates.
1437 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1438 if( gcCandidateSet.isEmpty() ) {
1439 break;
1440 }
1441
1442 // Use a visitor to cut down the number of pages that we load
1443 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1444 int last=-1;
1445 @Override
1446 public boolean isInterestedInKeysBetween(Location first, Location second) {
1447 if( first==null ) {
1448 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1449 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1450 subset.remove(second.getDataFileId());
1451 }
1452 return !subset.isEmpty();
1453 } else if( second==null ) {
1454 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1455 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1456 subset.remove(first.getDataFileId());
1457 }
1458 return !subset.isEmpty();
1459 } else {
1460 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1461 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1462 subset.remove(first.getDataFileId());
1463 }
1464 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1465 subset.remove(second.getDataFileId());
1466 }
1467 return !subset.isEmpty();
1468 }
1469 }
1470
1471 @Override
1472 public void visit(List<Location> keys, List<Long> values) {
1473 for (Location l : keys) {
1474 int fileId = l.getDataFileId();
1475 if( last != fileId ) {
1476 gcCandidateSet.remove(fileId);
1477 last = fileId;
1478 }
1479 }
1480 }
1481 });
1482 if (LOG.isTraceEnabled()) {
1483 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1484 }
1485 }
1486
1487 // check we are not deleting file with ack for in-use journal files
1488 if (LOG.isTraceEnabled()) {
1489 LOG.trace("gc candidates: " + gcCandidateSet);
1490 }
1491 final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
1492 Iterator<Integer> candidates = gcCandidateSet.iterator();
1493 while (candidates.hasNext()) {
1494 Integer candidate = candidates.next();
1495 Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
1496 if (referencedFileIds != null) {
1497 for (Integer referencedFileId : referencedFileIds) {
1498 if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
1499 // active file that is not targeted for deletion is referenced so don't delete
1500 candidates.remove();
1501 break;
1502 }
1503 }
1504 if (gcCandidateSet.contains(candidate)) {
1505 ackMessageFileMap.remove(candidate);
1506 } else {
1507 if (LOG.isTraceEnabled()) {
1508 LOG.trace("not removing data file: " + candidate
1509 + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1510 }
1511 }
1512 }
1513 }
1514
1515 if (!gcCandidateSet.isEmpty()) {
1516 if (LOG.isDebugEnabled()) {
1517 LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
1518 }
1519 journal.removeDataFiles(gcCandidateSet);
1520 }
1521 }
1522
1523 LOG.debug("Checkpoint done.");
1524 }
1525
1526 final Runnable nullCompletionCallback = new Runnable() {
1527 @Override
1528 public void run() {
1529 }
1530 };
1531 private Location checkpointProducerAudit() throws IOException {
1532 if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
1533 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1534 ObjectOutputStream oout = new ObjectOutputStream(baos);
1535 oout.writeObject(metadata.producerSequenceIdTracker);
1536 oout.flush();
1537 oout.close();
1538 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1539 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
1540 try {
1541 location.getLatch().await();
1542 } catch (InterruptedException e) {
1543 throw new InterruptedIOException(e.toString());
1544 }
1545 return location;
1546 }
1547 return metadata.producerSequenceIdTrackerLocation;
1548 }
1549
1550 public HashSet<Integer> getJournalFilesBeingReplicated() {
1551 return journalFilesBeingReplicated;
1552 }
1553
1554 // /////////////////////////////////////////////////////////////////
1555 // StoredDestination related implementation methods.
1556 // /////////////////////////////////////////////////////////////////
1557
1558 private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1559
1560 class StoredSubscription {
1561 SubscriptionInfo subscriptionInfo;
1562 String lastAckId;
1563 Location lastAckLocation;
1564 Location cursor;
1565 }
1566
1567 static class MessageKeys {
1568 final String messageId;
1569 final Location location;
1570
1571 public MessageKeys(String messageId, Location location) {
1572 this.messageId=messageId;
1573 this.location=location;
1574 }
1575
1576 @Override
1577 public String toString() {
1578 return "["+messageId+","+location+"]";
1579 }
1580 }
1581
1582 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1583 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1584
1585 @Override
1586 public MessageKeys readPayload(DataInput dataIn) throws IOException {
1587 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1588 }
1589
1590 @Override
1591 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1592 dataOut.writeUTF(object.messageId);
1593 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1594 }
1595 }
1596
1597 class LastAck {
1598 long lastAckedSequence;
1599 byte priority;
1600
1601 public LastAck(LastAck source) {
1602 this.lastAckedSequence = source.lastAckedSequence;
1603 this.priority = source.priority;
1604 }
1605
1606 public LastAck() {
1607 this.priority = MessageOrderIndex.HI;
1608 }
1609
1610 public LastAck(long ackLocation) {
1611 this.lastAckedSequence = ackLocation;
1612 this.priority = MessageOrderIndex.LO;
1613 }
1614
1615 public LastAck(long ackLocation, byte priority) {
1616 this.lastAckedSequence = ackLocation;
1617 this.priority = priority;
1618 }
1619
1620 @Override
1621 public String toString() {
1622 return "[" + lastAckedSequence + ":" + priority + "]";
1623 }
1624 }
1625
1626 protected class LastAckMarshaller implements Marshaller<LastAck> {
1627
1628 @Override
1629 public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1630 dataOut.writeLong(object.lastAckedSequence);
1631 dataOut.writeByte(object.priority);
1632 }
1633
1634 @Override
1635 public LastAck readPayload(DataInput dataIn) throws IOException {
1636 LastAck lastAcked = new LastAck();
1637 lastAcked.lastAckedSequence = dataIn.readLong();
1638 if (metadata.version >= 3) {
1639 lastAcked.priority = dataIn.readByte();
1640 }
1641 return lastAcked;
1642 }
1643
1644 @Override
1645 public int getFixedSize() {
1646 return 9;
1647 }
1648
1649 @Override
1650 public LastAck deepCopy(LastAck source) {
1651 return new LastAck(source);
1652 }
1653
1654 @Override
1655 public boolean isDeepCopySupported() {
1656 return true;
1657 }
1658 }
1659
1660 class StoredDestination {
1661
1662 MessageOrderIndex orderIndex = new MessageOrderIndex();
1663 BTreeIndex<Location, Long> locationIndex;
1664 BTreeIndex<String, Long> messageIdIndex;
1665
1666 // These bits are only set for Topics
1667 BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1668 BTreeIndex<String, LastAck> subscriptionAcks;
1669 HashMap<String, MessageOrderCursor> subscriptionCursors;
1670 ListIndex<String, SequenceSet> ackPositions;
1671
1672 // Transient data used to track which Messages are no longer needed.
1673 final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
1674 final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
1675 }
1676
1677 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1678
1679 @Override
1680 public StoredDestination readPayload(final DataInput dataIn) throws IOException {
1681 final StoredDestination value = new StoredDestination();
1682 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1683 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1684 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1685
1686 if (dataIn.readBoolean()) {
1687 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1688 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1689 if (metadata.version >= 4) {
1690 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
1691 } else {
1692 // upgrade
1693 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1694 @Override
1695 public void execute(Transaction tx) throws IOException {
1696 BTreeIndex<Long, HashSet<String>> oldAckPositions =
1697 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1698 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1699 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1700 oldAckPositions.load(tx);
1701
1702 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
1703
1704 // Do the initial build of the data in memory before writing into the store
1705 // based Ack Positions List to avoid a lot of disk thrashing.
1706 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
1707 while (iterator.hasNext()) {
1708 Entry<Long, HashSet<String>> entry = iterator.next();
1709
1710 for(String subKey : entry.getValue()) {
1711 SequenceSet pendingAcks = temp.get(subKey);
1712 if (pendingAcks == null) {
1713 pendingAcks = new SequenceSet();
1714 temp.put(subKey, pendingAcks);
1715 }
1716
1717 pendingAcks.add(entry.getKey());
1718 }
1719 }
1720
1721 // Now move the pending messages to ack data into the store backed
1722 // structure.
1723 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1724 for(String subscriptionKey : temp.keySet()) {
1725 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
1726 }
1727
1728 }
1729 });
1730 }
1731 }
1732 if (metadata.version >= 2) {
1733 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1734 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1735 } else {
1736 // upgrade
1737 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1738 @Override
1739 public void execute(Transaction tx) throws IOException {
1740 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1741 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1742 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1743 value.orderIndex.lowPriorityIndex.load(tx);
1744
1745 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1746 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1747 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1748 value.orderIndex.highPriorityIndex.load(tx);
1749 }
1750 });
1751 }
1752
1753 return value;
1754 }
1755
1756 @Override
1757 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1758 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
1759 dataOut.writeLong(value.locationIndex.getPageId());
1760 dataOut.writeLong(value.messageIdIndex.getPageId());
1761 if (value.subscriptions != null) {
1762 dataOut.writeBoolean(true);
1763 dataOut.writeLong(value.subscriptions.getPageId());
1764 dataOut.writeLong(value.subscriptionAcks.getPageId());
1765 dataOut.writeLong(value.ackPositions.getHeadPageId());
1766 } else {
1767 dataOut.writeBoolean(false);
1768 }
1769 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
1770 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
1771 }
1772 }
1773
1774 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1775 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1776
1777 @Override
1778 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1779 KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1780 rc.mergeFramed((InputStream)dataIn);
1781 return rc;
1782 }
1783
1784 @Override
1785 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1786 object.writeFramed((OutputStream)dataOut);
1787 }
1788 }
1789
1790 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1791 String key = key(destination);
1792 StoredDestination rc = storedDestinations.get(key);
1793 if (rc == null) {
1794 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1795 rc = loadStoredDestination(tx, key, topic);
1796 // Cache it. We may want to remove/unload destinations from the
1797 // cache that are not used for a while
1798 // to reduce memory usage.
1799 storedDestinations.put(key, rc);
1800 }
1801 return rc;
1802 }
1803
1804 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1805 String key = key(destination);
1806 StoredDestination rc = storedDestinations.get(key);
1807 if (rc == null && metadata.destinations.containsKey(tx, key)) {
1808 rc = getStoredDestination(destination, tx);
1809 }
1810 return rc;
1811 }
1812
1813 /**
1814 * @param tx
1815 * @param key
1816 * @param topic
1817 * @return
1818 * @throws IOException
1819 */
1820 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1821 // Try to load the existing indexes..
1822 StoredDestination rc = metadata.destinations.get(tx, key);
1823 if (rc == null) {
1824 // Brand new destination.. allocate indexes for it.
1825 rc = new StoredDestination();
1826 rc.orderIndex.allocate(tx);
1827 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1828 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1829
1830 if (topic) {
1831 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1832 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
1833 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1834 }
1835 metadata.destinations.put(tx, key, rc);
1836 }
1837
1838 // Configure the marshalers and load.
1839 rc.orderIndex.load(tx);
1840
1841 // Figure out the next key using the last entry in the destination.
1842 rc.orderIndex.configureLast(tx);
1843
1844 rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
1845 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1846 rc.locationIndex.load(tx);
1847
1848 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1849 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1850 rc.messageIdIndex.load(tx);
1851
1852 // If it was a topic...
1853 if (topic) {
1854
1855 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1856 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1857 rc.subscriptions.load(tx);
1858
1859 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1860 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
1861 rc.subscriptionAcks.load(tx);
1862
1863 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
1864 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
1865 rc.ackPositions.load(tx);
1866
1867 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
1868
1869 if (metadata.version < 3) {
1870
1871 // on upgrade need to fill ackLocation with available messages past last ack
1872 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1873 Entry<String, LastAck> entry = iterator.next();
1874 for (Iterator<Entry<Long, MessageKeys>> orderIterator =
1875 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
1876 Long sequence = orderIterator.next().getKey();
1877 addAckLocation(tx, rc, sequence, entry.getKey());
1878 }
1879 // modify so it is upgraded
1880 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
1881 }
1882 }
1883
1884 // Configure the message references index
1885 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
1886 while (subscriptions.hasNext()) {
1887 Entry<String, SequenceSet> subscription = subscriptions.next();
1888 SequenceSet pendingAcks = subscription.getValue();
1889 if (pendingAcks != null && !pendingAcks.isEmpty()) {
1890 Long lastPendingAck = pendingAcks.getTail().getLast();
1891 for(Long sequenceId : pendingAcks) {
1892 Long current = rc.messageReferences.get(sequenceId);
1893 if (current == null) {
1894 current = new Long(0);
1895 }
1896
1897 // We always add a trailing empty entry for the next position to start from
1898 // so we need to ensure we don't count that as a message reference on reload.
1899 if (!sequenceId.equals(lastPendingAck)) {
1900 current = current.longValue() + 1;
1901 }
1902
1903 rc.messageReferences.put(sequenceId, current);
1904 }
1905 }
1906 }
1907
1908 // Configure the subscription cache
1909 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1910 Entry<String, LastAck> entry = iterator.next();
1911 rc.subscriptionCache.add(entry.getKey());
1912 }
1913
1914 if (rc.orderIndex.nextMessageId == 0) {
1915 // check for existing durable sub all acked out - pull next seq from acks as messages are gone
1916 if (!rc.subscriptionAcks.isEmpty(tx)) {
1917 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1918 Entry<String, LastAck> entry = iterator.next();
1919 rc.orderIndex.nextMessageId =
1920 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
1921 }
1922 }
1923 } else {
1924 // update based on ackPositions for unmatched, last entry is always the next
1925 if (!rc.messageReferences.isEmpty()) {
1926 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
1927 rc.orderIndex.nextMessageId =
1928 Math.max(rc.orderIndex.nextMessageId, nextMessageId);
1929 }
1930 }
1931 }
1932
1933 if (metadata.version < VERSION) {
1934 // store again after upgrade
1935 metadata.destinations.put(tx, key, rc);
1936 }
1937 return rc;
1938 }
1939
1940 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1941 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1942 if (sequences == null) {
1943 sequences = new SequenceSet();
1944 sequences.add(messageSequence);
1945 sd.ackPositions.add(tx, subscriptionKey, sequences);
1946 } else {
1947 sequences.add(messageSequence);
1948 sd.ackPositions.put(tx, subscriptionKey, sequences);
1949 }
1950
1951 Long count = sd.messageReferences.get(messageSequence);
1952 if (count == null) {
1953 count = Long.valueOf(0L);
1954 }
1955 count = count.longValue() + 1;
1956 sd.messageReferences.put(messageSequence, count);
1957 }
1958
1959 // new sub is interested in potentially all existing messages
1960 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1961 SequenceSet allOutstanding = new SequenceSet();
1962 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
1963 while (iterator.hasNext()) {
1964 SequenceSet set = iterator.next().getValue();
1965 for (Long entry : set) {
1966 allOutstanding.add(entry);
1967 }
1968 }
1969 sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
1970
1971 for (Long ackPosition : allOutstanding) {
1972 Long count = sd.messageReferences.get(ackPosition);
1973 count = count.longValue() + 1;
1974 sd.messageReferences.put(ackPosition, count);
1975 }
1976 }
1977
1978 // on a new message add, all existing subs are interested in this message
1979 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
1980 for(String subscriptionKey : sd.subscriptionCache) {
1981 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1982 if (sequences == null) {
1983 sequences = new SequenceSet();
1984 sequences.add(new Sequence(messageSequence, messageSequence + 1));
1985 sd.ackPositions.add(tx, subscriptionKey, sequences);
1986 } else {
1987 sequences.add(new Sequence(messageSequence, messageSequence + 1));
1988 sd.ackPositions.put(tx, subscriptionKey, sequences);
1989 }
1990
1991 Long count = sd.messageReferences.get(messageSequence);
1992 if (count == null) {
1993 count = Long.valueOf(0L);
1994 }
1995 count = count.longValue() + 1;
1996 sd.messageReferences.put(messageSequence, count);
1997 sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
1998 }
1999 }
2000
2001 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2002 if (!sd.ackPositions.isEmpty(tx)) {
2003 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2004 if (sequences == null || sequences.isEmpty()) {
2005 return;
2006 }
2007
2008 ArrayList<Long> unreferenced = new ArrayList<Long>();
2009
2010 for(Long sequenceId : sequences) {
2011 Long references = sd.messageReferences.get(sequenceId);
2012 if (references != null) {
2013 references = references.longValue() - 1;
2014
2015 if (references.longValue() > 0) {
2016 sd.messageReferences.put(sequenceId, references);
2017 } else {
2018 sd.messageReferences.remove(sequenceId);
2019 unreferenced.add(sequenceId);
2020 }
2021 }
2022 }
2023
2024 for(Long sequenceId : unreferenced) {
2025 // Find all the entries that need to get deleted.
2026 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2027 sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2028
2029 // Do the actual deletes.
2030 for (Entry<Long, MessageKeys> entry : deletes) {
2031 sd.locationIndex.remove(tx, entry.getValue().location);
2032 sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2033 sd.orderIndex.remove(tx, entry.getKey());
2034 }
2035 }
2036 }
2037 }
2038
2039 /**
2040 * @param tx
2041 * @param sd
2042 * @param subscriptionKey
2043 * @param messageSequence
2044 * @throws IOException
2045 */
2046 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
2047 // Remove the sub from the previous location set..
2048 if (messageSequence != null) {
2049 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2050 if (range != null && !range.isEmpty()) {
2051 range.remove(messageSequence);
2052 if (!range.isEmpty()) {
2053 sd.ackPositions.put(tx, subscriptionKey, range);
2054 } else {
2055 sd.ackPositions.remove(tx, subscriptionKey);
2056 }
2057
2058 // Check if the message is reference by any other subscription.
2059 Long count = sd.messageReferences.get(messageSequence);
2060 if (count != null){
2061 long references = count.longValue() - 1;
2062 if (references > 0) {
2063 sd.messageReferences.put(messageSequence, Long.valueOf(references));
2064 return;
2065 } else {
2066 sd.messageReferences.remove(messageSequence);
2067 }
2068 }
2069
2070 // Find all the entries that need to get deleted.
2071 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2072 sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2073
2074 // Do the actual deletes.
2075 for (Entry<Long, MessageKeys> entry : deletes) {
2076 sd.locationIndex.remove(tx, entry.getValue().location);
2077 sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2078 sd.orderIndex.remove(tx, entry.getKey());
2079 }
2080 }
2081 }
2082 }
2083
2084 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2085 return sd.subscriptionAcks.get(tx, subscriptionKey);
2086 }
2087
2088 public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2089 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2090 if (messageSequences != null) {
2091 long result = messageSequences.rangeSize();
2092 // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2093 return result > 0 ? result - 1 : 0;
2094 }
2095
2096 return 0;
2097 }
2098
2099 private String key(KahaDestination destination) {
2100 return destination.getType().getNumber() + ":" + destination.getName();
2101 }
2102
2103 // /////////////////////////////////////////////////////////////////
2104 // Transaction related implementation methods.
2105 // /////////////////////////////////////////////////////////////////
2106 @SuppressWarnings("rawtypes")
2107 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2108 @SuppressWarnings("rawtypes")
2109 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2110 protected final Set<String> ackedAndPrepared = new HashSet<String>();
2111
2112 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2113 // till then they are skipped by the store.
2114 // 'at most once' XA guarantee
2115 public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2116 this.indexLock.writeLock().lock();
2117 try {
2118 for (MessageAck ack : acks) {
2119 ackedAndPrepared.add(ack.getLastMessageId().toString());
2120 }
2121 } finally {
2122 this.indexLock.writeLock().unlock();
2123 }
2124 }
2125
2126 public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
2127 if (acks != null) {
2128 this.indexLock.writeLock().lock();
2129 try {
2130 for (MessageAck ack : acks) {
2131 ackedAndPrepared.remove(ack.getLastMessageId().toString());
2132 }
2133 } finally {
2134 this.indexLock.writeLock().unlock();
2135 }
2136 }
2137 }
2138
2139 @SuppressWarnings("rawtypes")
2140 private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
2141 TransactionId key = TransactionIdConversion.convert(info);
2142 List<Operation> tx;
2143 synchronized (inflightTransactions) {
2144 tx = inflightTransactions.get(key);
2145 if (tx == null) {
2146 tx = Collections.synchronizedList(new ArrayList<Operation>());
2147 inflightTransactions.put(key, tx);
2148 }
2149 }
2150 return tx;
2151 }
2152
2153 @SuppressWarnings("unused")
2154 private TransactionId key(KahaTransactionInfo transactionInfo) {
2155 return TransactionIdConversion.convert(transactionInfo);
2156 }
2157
2158 abstract class Operation <T extends JournalCommand<T>> {
2159 final T command;
2160 final Location location;
2161
2162 public Operation(T command, Location location) {
2163 this.command = command;
2164 this.location = location;
2165 }
2166
2167 public Location getLocation() {
2168 return location;
2169 }
2170
2171 public T getCommand() {
2172 return command;
2173 }
2174
2175 abstract public void execute(Transaction tx) throws IOException;
2176 }
2177
2178 class AddOpperation extends Operation<KahaAddMessageCommand> {
2179
2180 public AddOpperation(KahaAddMessageCommand command, Location location) {
2181 super(command, location);
2182 }
2183
2184 @Override
2185 public void execute(Transaction tx) throws IOException {
2186 upadateIndex(tx, command, location);
2187 }
2188
2189 }
2190
2191 class RemoveOpperation extends Operation<KahaRemoveMessageCommand> {
2192
2193 public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
2194 super(command, location);
2195 }
2196
2197 @Override
2198 public void execute(Transaction tx) throws IOException {
2199 updateIndex(tx, command, location);
2200 }
2201 }
2202
2203 // /////////////////////////////////////////////////////////////////
2204 // Initialization related implementation methods.
2205 // /////////////////////////////////////////////////////////////////
2206
2207 private PageFile createPageFile() {
2208 PageFile index = new PageFile(directory, "db");
2209 index.setEnableWriteThread(isEnableIndexWriteAsync());
2210 index.setWriteBatchSize(getIndexWriteBatchSize());
2211 index.setPageCacheSize(indexCacheSize);
2212 index.setUseLFRUEviction(isUseIndexLFRUEviction());
2213 index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2214 index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2215 index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2216 index.setEnablePageCaching(isEnableIndexPageCaching());
2217 return index;
2218 }
2219
2220 private Journal createJournal() throws IOException {
2221 Journal manager = new Journal();
2222 manager.setDirectory(directory);
2223 manager.setMaxFileLength(getJournalMaxFileLength());
2224 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2225 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2226 manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2227 manager.setArchiveDataLogs(isArchiveDataLogs());
2228 manager.setSizeAccumulator(journalSize);
2229 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2230 if (getDirectoryArchive() != null) {
2231 IOHelper.mkdirs(getDirectoryArchive());
2232 manager.setDirectoryArchive(getDirectoryArchive());
2233 }
2234 return manager;
2235 }
2236
2237 public int getJournalMaxWriteBatchSize() {
2238 return journalMaxWriteBatchSize;
2239 }
2240
2241 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2242 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2243 }
2244
2245 public File getDirectory() {
2246 return directory;
2247 }
2248
2249 public void setDirectory(File directory) {
2250 this.directory = directory;
2251 }
2252
2253 public boolean isDeleteAllMessages() {
2254 return deleteAllMessages;
2255 }
2256
2257 public void setDeleteAllMessages(boolean deleteAllMessages) {
2258 this.deleteAllMessages = deleteAllMessages;
2259 }
2260
2261 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2262 this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2263 }
2264
2265 public int getIndexWriteBatchSize() {
2266 return setIndexWriteBatchSize;
2267 }
2268
2269 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2270 this.enableIndexWriteAsync = enableIndexWriteAsync;
2271 }
2272
2273 boolean isEnableIndexWriteAsync() {
2274 return enableIndexWriteAsync;
2275 }
2276
2277 public boolean isEnableJournalDiskSyncs() {
2278 return enableJournalDiskSyncs;
2279 }
2280
2281 public void setEnableJournalDiskSyncs(boolean syncWrites) {
2282 this.enableJournalDiskSyncs = syncWrites;
2283 }
2284
2285 public long getCheckpointInterval() {
2286 return checkpointInterval;
2287 }
2288
2289 public void setCheckpointInterval(long checkpointInterval) {
2290 this.checkpointInterval = checkpointInterval;
2291 }
2292
2293 public long getCleanupInterval() {
2294 return cleanupInterval;
2295 }
2296
2297 public void setCleanupInterval(long cleanupInterval) {
2298 this.cleanupInterval = cleanupInterval;
2299 }
2300
2301 public void setJournalMaxFileLength(int journalMaxFileLength) {
2302 this.journalMaxFileLength = journalMaxFileLength;
2303 }
2304
2305 public int getJournalMaxFileLength() {
2306 return journalMaxFileLength;
2307 }
2308
2309 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
2310 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
2311 }
2312
2313 public int getMaxFailoverProducersToTrack() {
2314 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
2315 }
2316
2317 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
2318 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
2319 }
2320
2321 public int getFailoverProducersAuditDepth() {
2322 return this.metadata.producerSequenceIdTracker.getAuditDepth();
2323 }
2324
2325 public PageFile getPageFile() {
2326 if (pageFile == null) {
2327 pageFile = createPageFile();
2328 }
2329 return pageFile;
2330 }
2331
2332 public Journal getJournal() throws IOException {
2333 if (journal == null) {
2334 journal = createJournal();
2335 }
2336 return journal;
2337 }
2338
2339 public boolean isFailIfDatabaseIsLocked() {
2340 return failIfDatabaseIsLocked;
2341 }
2342
2343 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
2344 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
2345 }
2346
2347 public boolean isIgnoreMissingJournalfiles() {
2348 return ignoreMissingJournalfiles;
2349 }
2350
2351 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
2352 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
2353 }
2354
2355 public int getIndexCacheSize() {
2356 return indexCacheSize;
2357 }
2358
2359 public void setIndexCacheSize(int indexCacheSize) {
2360 this.indexCacheSize = indexCacheSize;
2361 }
2362
2363 public boolean isCheckForCorruptJournalFiles() {
2364 return checkForCorruptJournalFiles;
2365 }
2366
2367 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
2368 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
2369 }
2370
2371 public boolean isChecksumJournalFiles() {
2372 return checksumJournalFiles;
2373 }
2374
2375 public void setChecksumJournalFiles(boolean checksumJournalFiles) {
2376 this.checksumJournalFiles = checksumJournalFiles;
2377 }
2378
2379 @Override
2380 public void setBrokerService(BrokerService brokerService) {
2381 this.brokerService = brokerService;
2382 }
2383
2384 /**
2385 * @return the archiveDataLogs
2386 */
2387 public boolean isArchiveDataLogs() {
2388 return this.archiveDataLogs;
2389 }
2390
2391 /**
2392 * @param archiveDataLogs the archiveDataLogs to set
2393 */
2394 public void setArchiveDataLogs(boolean archiveDataLogs) {
2395 this.archiveDataLogs = archiveDataLogs;
2396 }
2397
2398 /**
2399 * @return the directoryArchive
2400 */
2401 public File getDirectoryArchive() {
2402 return this.directoryArchive;
2403 }
2404
2405 /**
2406 * @param directoryArchive the directoryArchive to set
2407 */
2408 public void setDirectoryArchive(File directoryArchive) {
2409 this.directoryArchive = directoryArchive;
2410 }
2411
2412 public boolean isRewriteOnRedelivery() {
2413 return rewriteOnRedelivery;
2414 }
2415
2416 public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
2417 this.rewriteOnRedelivery = rewriteOnRedelivery;
2418 }
2419
2420 public boolean isArchiveCorruptedIndex() {
2421 return archiveCorruptedIndex;
2422 }
2423
2424 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
2425 this.archiveCorruptedIndex = archiveCorruptedIndex;
2426 }
2427
2428 public float getIndexLFUEvictionFactor() {
2429 return indexLFUEvictionFactor;
2430 }
2431
2432 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
2433 this.indexLFUEvictionFactor = indexLFUEvictionFactor;
2434 }
2435
2436 public boolean isUseIndexLFRUEviction() {
2437 return useIndexLFRUEviction;
2438 }
2439
2440 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
2441 this.useIndexLFRUEviction = useIndexLFRUEviction;
2442 }
2443
2444 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
2445 this.enableIndexDiskSyncs = enableIndexDiskSyncs;
2446 }
2447
2448 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
2449 this.enableIndexRecoveryFile = enableIndexRecoveryFile;
2450 }
2451
2452 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
2453 this.enableIndexPageCaching = enableIndexPageCaching;
2454 }
2455
2456 public boolean isEnableIndexDiskSyncs() {
2457 return enableIndexDiskSyncs;
2458 }
2459
2460 public boolean isEnableIndexRecoveryFile() {
2461 return enableIndexRecoveryFile;
2462 }
2463
2464 public boolean isEnableIndexPageCaching() {
2465 return enableIndexPageCaching;
2466 }
2467
2468 // /////////////////////////////////////////////////////////////////
2469 // Internal conversion methods.
2470 // /////////////////////////////////////////////////////////////////
2471
2472 class MessageOrderCursor{
2473 long defaultCursorPosition;
2474 long lowPriorityCursorPosition;
2475 long highPriorityCursorPosition;
2476 MessageOrderCursor(){
2477 }
2478
2479 MessageOrderCursor(long position){
2480 this.defaultCursorPosition=position;
2481 this.lowPriorityCursorPosition=position;
2482 this.highPriorityCursorPosition=position;
2483 }
2484
2485 MessageOrderCursor(MessageOrderCursor other){
2486 this.defaultCursorPosition=other.defaultCursorPosition;
2487 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2488 this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2489 }
2490
2491 MessageOrderCursor copy() {
2492 return new MessageOrderCursor(this);
2493 }
2494
2495 void reset() {
2496 this.defaultCursorPosition=0;
2497 this.highPriorityCursorPosition=0;
2498 this.lowPriorityCursorPosition=0;
2499 }
2500
2501 void increment() {
2502 if (defaultCursorPosition!=0) {
2503 defaultCursorPosition++;
2504 }
2505 if (highPriorityCursorPosition!=0) {
2506 highPriorityCursorPosition++;
2507 }
2508 if (lowPriorityCursorPosition!=0) {
2509 lowPriorityCursorPosition++;
2510 }
2511 }
2512
2513 @Override
2514 public String toString() {
2515 return "MessageOrderCursor:[def:" + defaultCursorPosition
2516 + ", low:" + lowPriorityCursorPosition
2517 + ", high:" + highPriorityCursorPosition + "]";
2518 }
2519
2520 public void sync(MessageOrderCursor other) {
2521 this.defaultCursorPosition=other.defaultCursorPosition;
2522 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2523 this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2524 }
2525 }
2526
2527 class MessageOrderIndex {
2528 static final byte HI = 9;
2529 static final byte LO = 0;
2530 static final byte DEF = 4;
2531
2532 long nextMessageId;
2533 BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2534 BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2535 BTreeIndex<Long, MessageKeys> highPriorityIndex;
2536 MessageOrderCursor cursor = new MessageOrderCursor();
2537 Long lastDefaultKey;
2538 Long lastHighKey;
2539 Long lastLowKey;
2540 byte lastGetPriority;
2541
2542 MessageKeys remove(Transaction tx, Long key) throws IOException {
2543 MessageKeys result = defaultPriorityIndex.remove(tx, key);
2544 if (result == null && highPriorityIndex!=null) {
2545 result = highPriorityIndex.remove(tx, key);
2546 if (result ==null && lowPriorityIndex!=null) {
2547 result = lowPriorityIndex.remove(tx, key);
2548 }
2549 }
2550 return result;
2551 }
2552
2553 void load(Transaction tx) throws IOException {
2554 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2555 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2556 defaultPriorityIndex.load(tx);
2557 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2558 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2559 lowPriorityIndex.load(tx);
2560 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2561 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2562 highPriorityIndex.load(tx);
2563 }
2564
2565 void allocate(Transaction tx) throws IOException {
2566 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2567 if (metadata.version >= 2) {
2568 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2569 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2570 }
2571 }
2572
2573 void configureLast(Transaction tx) throws IOException {
2574 // Figure out the next key using the last entry in the destination.
2575 if (highPriorityIndex != null) {
2576 Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
2577 if (lastEntry != null) {
2578 nextMessageId = lastEntry.getKey() + 1;
2579 } else {
2580 lastEntry = defaultPriorityIndex.getLast(tx);
2581 if (lastEntry != null) {
2582 nextMessageId = lastEntry.getKey() + 1;
2583 } else {
2584 lastEntry = lowPriorityIndex.getLast(tx);
2585 if (lastEntry != null) {
2586 nextMessageId = lastEntry.getKey() + 1;
2587 }
2588 }
2589 }
2590 } else {
2591 Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
2592 if (lastEntry != null) {
2593 nextMessageId = lastEntry.getKey() + 1;
2594 }
2595 }
2596 }
2597
2598 void clear(Transaction tx) throws IOException {
2599 this.remove(tx);
2600 this.resetCursorPosition();
2601 this.allocate(tx);
2602 this.load(tx);
2603 this.configureLast(tx);
2604 }
2605
2606 void remove(Transaction tx) throws IOException {
2607 defaultPriorityIndex.clear(tx);
2608 defaultPriorityIndex.unload(tx);
2609 tx.free(defaultPriorityIndex.getPageId());
2610 if (lowPriorityIndex != null) {
2611 lowPriorityIndex.clear(tx);
2612 lowPriorityIndex.unload(tx);
2613
2614 tx.free(lowPriorityIndex.getPageId());
2615 }
2616 if (highPriorityIndex != null) {
2617 highPriorityIndex.clear(tx);
2618 highPriorityIndex.unload(tx);
2619 tx.free(highPriorityIndex.getPageId());
2620 }
2621 }
2622
2623 void resetCursorPosition() {
2624 this.cursor.reset();
2625 lastDefaultKey = null;
2626 lastHighKey = null;
2627 lastLowKey = null;
2628 }
2629
2630 void setBatch(Transaction tx, Long sequence) throws IOException {
2631 if (sequence != null) {
2632 Long nextPosition = new Long(sequence.longValue() + 1);
2633 if (defaultPriorityIndex.containsKey(tx, sequence)) {
2634 lastDefaultKey = sequence;
2635 cursor.defaultCursorPosition = nextPosition.longValue();
2636 } else if (highPriorityIndex != null) {
2637 if (highPriorityIndex.containsKey(tx, sequence)) {
2638 lastHighKey = sequence;
2639 cursor.highPriorityCursorPosition = nextPosition.longValue();
2640 } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2641 lastLowKey = sequence;
2642 cursor.lowPriorityCursorPosition = nextPosition.longValue();
2643 }
2644 } else {
2645 LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this);
2646 lastDefaultKey = sequence;
2647 cursor.defaultCursorPosition = nextPosition.longValue();
2648 }
2649 }
2650 }
2651
2652 void setBatch(Transaction tx, LastAck last) throws IOException {
2653 setBatch(tx, last.lastAckedSequence);
2654 if (cursor.defaultCursorPosition == 0
2655 && cursor.highPriorityCursorPosition == 0
2656 && cursor.lowPriorityCursorPosition == 0) {
2657 long next = last.lastAckedSequence + 1;
2658 switch (last.priority) {
2659 case DEF:
2660 cursor.defaultCursorPosition = next;
2661 cursor.highPriorityCursorPosition = next;
2662 break;
2663 case HI:
2664 cursor.highPriorityCursorPosition = next;
2665 break;
2666 case LO:
2667 cursor.lowPriorityCursorPosition = next;
2668 cursor.defaultCursorPosition = next;
2669 cursor.highPriorityCursorPosition = next;
2670 break;
2671 }
2672 }
2673 }
2674
2675 void stoppedIterating() {
2676 if (lastDefaultKey!=null) {
2677 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2678 }
2679 if (lastHighKey!=null) {
2680 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2681 }
2682 if (lastLowKey!=null) {
2683 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2684 }
2685 lastDefaultKey = null;
2686 lastHighKey = null;
2687 lastLowKey = null;
2688 }
2689
2690 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2691 throws IOException {
2692 if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2693 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2694 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2695 getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2696 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2697 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2698 }
2699 }
2700
2701 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2702 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2703
2704 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
2705 deletes.add(iterator.next());
2706 }
2707
2708 long getNextMessageId(int priority) {
2709 return nextMessageId++;
2710 }
2711
2712 MessageKeys get(Transaction tx, Long key) throws IOException {
2713 MessageKeys result = defaultPriorityIndex.get(tx, key);
2714 if (result == null) {
2715 result = highPriorityIndex.get(tx, key);
2716 if (result == null) {
2717 result = lowPriorityIndex.get(tx, key);
2718 lastGetPriority = LO;
2719 } else {
2720 lastGetPriority = HI;
2721 }
2722 } else {
2723 lastGetPriority = DEF;
2724 }
2725 return result;
2726 }
2727
2728 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2729 if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
2730 return defaultPriorityIndex.put(tx, key, value);
2731 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
2732 return highPriorityIndex.put(tx, key, value);
2733 } else {
2734 return lowPriorityIndex.put(tx, key, value);
2735 }
2736 }
2737
2738 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
2739 return new MessageOrderIterator(tx,cursor);
2740 }
2741
2742 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
2743 return new MessageOrderIterator(tx,m);
2744 }
2745
2746 public byte lastGetPriority() {
2747 return lastGetPriority;
2748 }
2749
2750 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
2751 Iterator<Entry<Long, MessageKeys>>currentIterator;
2752 final Iterator<Entry<Long, MessageKeys>>highIterator;
2753 final Iterator<Entry<Long, MessageKeys>>defaultIterator;
2754 final Iterator<Entry<Long, MessageKeys>>lowIterator;
2755
2756 MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
2757 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
2758 if (highPriorityIndex != null) {
2759 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
2760 } else {
2761 this.highIterator = null;
2762 }
2763 if (lowPriorityIndex != null) {
2764 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
2765 } else {
2766 this.lowIterator = null;
2767 }
2768 }
2769
2770 @Override
2771 public boolean hasNext() {
2772 if (currentIterator == null) {
2773 if (highIterator != null) {
2774 if (highIterator.hasNext()) {
2775 currentIterator = highIterator;
2776 return currentIterator.hasNext();
2777 }
2778 if (defaultIterator.hasNext()) {
2779 currentIterator = defaultIterator;
2780 return currentIterator.hasNext();
2781 }
2782 if (lowIterator.hasNext()) {
2783 currentIterator = lowIterator;
2784 return currentIterator.hasNext();
2785 }
2786 return false;
2787 } else {
2788 currentIterator = defaultIterator;
2789 return currentIterator.hasNext();
2790 }
2791 }
2792 if (highIterator != null) {
2793 if (currentIterator.hasNext()) {
2794 return true;
2795 }
2796 if (currentIterator == highIterator) {
2797 if (defaultIterator.hasNext()) {
2798 currentIterator = defaultIterator;
2799 return currentIterator.hasNext();
2800 }
2801 if (lowIterator.hasNext()) {
2802 currentIterator = lowIterator;
2803 return currentIterator.hasNext();
2804 }
2805 return false;
2806 }
2807
2808 if (currentIterator == defaultIterator) {
2809 if (lowIterator.hasNext()) {
2810 currentIterator = lowIterator;
2811 return currentIterator.hasNext();
2812 }
2813 return false;
2814 }
2815 }
2816 return currentIterator.hasNext();
2817 }
2818
2819 @Override
2820 public Entry<Long, MessageKeys> next() {
2821 Entry<Long, MessageKeys> result = currentIterator.next();
2822 if (result != null) {
2823 Long key = result.getKey();
2824 if (highIterator != null) {
2825 if (currentIterator == defaultIterator) {
2826 lastDefaultKey = key;
2827 } else if (currentIterator == highIterator) {
2828 lastHighKey = key;
2829 } else {
2830 lastLowKey = key;
2831 }
2832 } else {
2833 lastDefaultKey = key;
2834 }
2835 }
2836 return result;
2837 }
2838
2839 @Override
2840 public void remove() {
2841 throw new UnsupportedOperationException();
2842 }
2843
2844 }
2845 }
2846
2847 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
2848 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
2849
2850 @Override
2851 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
2852 ByteArrayOutputStream baos = new ByteArrayOutputStream();
2853 ObjectOutputStream oout = new ObjectOutputStream(baos);
2854 oout.writeObject(object);
2855 oout.flush();
2856 oout.close();
2857 byte[] data = baos.toByteArray();
2858 dataOut.writeInt(data.length);
2859 dataOut.write(data);
2860 }
2861
2862 @Override
2863 @SuppressWarnings("unchecked")
2864 public HashSet<String> readPayload(DataInput dataIn) throws IOException {
2865 int dataLen = dataIn.readInt();
2866 byte[] data = new byte[dataLen];
2867 dataIn.readFully(data);
2868 ByteArrayInputStream bais = new ByteArrayInputStream(data);
2869 ObjectInputStream oin = new ObjectInputStream(bais);
2870 try {
2871 return (HashSet<String>) oin.readObject();
2872 } catch (ClassNotFoundException cfe) {
2873 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
2874 ioe.initCause(cfe);
2875 throw ioe;
2876 }
2877 }
2878 }
2879 }