org.apache.activemq.store.kahadb
Class MessageDatabase

java.lang.Object
  extended by org.apache.activemq.util.ServiceSupport
      extended by org.apache.activemq.store.kahadb.MessageDatabase
All Implemented Interfaces:
BrokerServiceAware, Service
Direct Known Subclasses:
KahaDBStore

public abstract class MessageDatabase
extends ServiceSupport
implements BrokerServiceAware


Nested Class Summary
protected  class MessageDatabase.LastAckMarshaller
           
protected static class MessageDatabase.MessageKeysMarshaller
           
protected  class MessageDatabase.Metadata
           
protected  class MessageDatabase.StoredDestinationMarshaller
           
 
Field Summary
protected  Set<String> ackedAndPrepared
           
protected  boolean archiveDataLogs
           
protected  BrokerService brokerService
           
protected  Thread checkpointThread
           
static File DEFAULT_DIRECTORY
           
protected  boolean deleteAllMessages
           
protected  File directory
           
protected  File directoryArchive
           
protected  boolean enableJournalDiskSyncs
           
protected  boolean failIfDatabaseIsLocked
           
protected  boolean forceRecoverIndex
           
protected  ReentrantReadWriteLock indexLock
           
protected  org.apache.kahadb.journal.Journal journal
           
protected  AtomicLong journalSize
           
static int LOG_SLOW_ACCESS_TIME
           
protected  MessageDatabase.Metadata metadata
           
protected  org.apache.activemq.store.kahadb.MessageDatabase.MetadataMarshaller metadataMarshaller
           
protected  AtomicBoolean opened
           
protected  org.apache.kahadb.page.PageFile pageFile
           
protected  LinkedHashMap<TransactionId,List<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> preparedTransactions
           
static String PROPERTY_LOG_SLOW_ACCESS_TIME
           
protected static org.apache.activemq.protobuf.Buffer UNMATCHED
           
 
Constructor Summary
MessageDatabase()
           
 
Method Summary
 void checkpoint(Callback closure)
           
protected  void checkpointCleanup(boolean cleanup)
           
 void close()
           
 void doStart()
           
 void doStop(ServiceStopper stopper)
           
 void forgetRecoveredAcks(ArrayList<MessageAck> acks)
           
 long getCheckpointInterval()
           
 long getCleanupInterval()
           
 File getDirectory()
           
 File getDirectoryArchive()
           
protected  org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getExistingStoredDestination(org.apache.activemq.store.kahadb.data.KahaDestination destination, org.apache.kahadb.page.Transaction tx)
           
 int getFailoverProducersAuditDepth()
           
 org.apache.kahadb.journal.Location getFirstInProgressTxLocation()
           
 int getIndexCacheSize()
           
 float getIndexLFUEvictionFactor()
           
 int getIndexWriteBatchSize()
           
 org.apache.kahadb.journal.Journal getJournal()
           
 HashSet<Integer> getJournalFilesBeingReplicated()
           
 int getJournalMaxFileLength()
           
 int getJournalMaxWriteBatchSize()
           
 org.apache.activemq.store.kahadb.MessageDatabase.LastAck getLastAck(org.apache.kahadb.page.Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey)
           
 org.apache.kahadb.journal.Location getLastUpdatePosition()
           
 int getMaxFailoverProducersToTrack()
           
 org.apache.kahadb.page.PageFile getPageFile()
           
protected  org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getStoredDestination(org.apache.activemq.store.kahadb.data.KahaDestination destination, org.apache.kahadb.page.Transaction tx)
           
 long getStoredMessageCount(org.apache.kahadb.page.Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey)
           
 void incrementalRecover()
           
 boolean isArchiveCorruptedIndex()
           
 boolean isArchiveDataLogs()
           
 boolean isCheckForCorruptJournalFiles()
           
 boolean isChecksumJournalFiles()
           
 boolean isDeleteAllMessages()
           
 boolean isEnableIndexDiskSyncs()
           
 boolean isEnableIndexPageCaching()
           
 boolean isEnableIndexRecoveryFile()
           
 boolean isEnableJournalDiskSyncs()
           
 boolean isFailIfDatabaseIsLocked()
           
 boolean isIgnoreMissingJournalfiles()
           
 boolean isRewriteOnRedelivery()
           
 boolean isUseIndexLFRUEviction()
           
 void load()
           
 JournalCommand<?> load(org.apache.kahadb.journal.Location location)
          Loads a previously stored JournalMessage
 void open()
           
protected  void process(org.apache.activemq.store.kahadb.data.KahaAddMessageCommand command, org.apache.kahadb.journal.Location location)
           
protected  void process(org.apache.activemq.store.kahadb.data.KahaCommitCommand command, org.apache.kahadb.journal.Location location, Runnable after)
           
protected  void process(org.apache.activemq.store.kahadb.data.KahaPrepareCommand command, org.apache.kahadb.journal.Location location)
           
protected  void process(org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand command, org.apache.kahadb.journal.Location location)
           
protected  void process(org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand command, org.apache.kahadb.journal.Location location)
           
protected  void process(org.apache.activemq.store.kahadb.data.KahaRollbackCommand command, org.apache.kahadb.journal.Location location)
           
protected  void process(org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand command, org.apache.kahadb.journal.Location location)
           
protected  void processLocation(org.apache.kahadb.journal.Location location)
           
protected  void recoverIndex(org.apache.kahadb.page.Transaction tx)
           
 void setArchiveCorruptedIndex(boolean archiveCorruptedIndex)
           
 void setArchiveDataLogs(boolean archiveDataLogs)
           
 void setBrokerService(BrokerService brokerService)
           
 void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)
           
 void setCheckpointInterval(long checkpointInterval)
           
 void setChecksumJournalFiles(boolean checksumJournalFiles)
           
 void setCleanupInterval(long cleanupInterval)
           
 void setDeleteAllMessages(boolean deleteAllMessages)
           
 void setDirectory(File directory)
           
 void setDirectoryArchive(File directoryArchive)
           
 void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs)
           
 void setEnableIndexPageCaching(boolean enableIndexPageCaching)
           
 void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile)
           
 void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
           
 void setEnableJournalDiskSyncs(boolean syncWrites)
           
 void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked)
           
 void setFailoverProducersAuditDepth(int failoverProducersAuditDepth)
           
 void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
           
 void setIndexCacheSize(int indexCacheSize)
           
 void setIndexLFUEvictionFactor(float indexLFUEvictionFactor)
           
 void setIndexWriteBatchSize(int setIndexWriteBatchSize)
           
 void setJournalMaxFileLength(int journalMaxFileLength)
           
 void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
           
 void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack)
           
 void setRewriteOnRedelivery(boolean rewriteOnRedelivery)
           
 void setUseIndexLFRUEviction(boolean useIndexLFRUEviction)
           
 org.apache.kahadb.journal.Location store(JournalCommand<?> data)
           
 org.apache.kahadb.journal.Location store(JournalCommand<?> data, boolean sync, Runnable before, Runnable after)
           
 org.apache.kahadb.journal.Location store(JournalCommand<?> data, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete)
          All updated are are funneled through this method.
 org.apache.kahadb.journal.Location store(JournalCommand<?> data, Runnable onJournalStoreComplete)
           
 org.apache.kahadb.util.ByteSequence toByteSequence(JournalCommand<?> data)
           
 void trackRecoveredAcks(ArrayList<MessageAck> acks)
           
 void unload()
           
 
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, postStop, preStart, removeServiceListener, start, stop
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

brokerService

protected BrokerService brokerService

PROPERTY_LOG_SLOW_ACCESS_TIME

public static final String PROPERTY_LOG_SLOW_ACCESS_TIME
See Also:
Constant Field Values

LOG_SLOW_ACCESS_TIME

public static final int LOG_SLOW_ACCESS_TIME

DEFAULT_DIRECTORY

public static final File DEFAULT_DIRECTORY

UNMATCHED

protected static final org.apache.activemq.protobuf.Buffer UNMATCHED

pageFile

protected org.apache.kahadb.page.PageFile pageFile

journal

protected org.apache.kahadb.journal.Journal journal

metadata

protected MessageDatabase.Metadata metadata

metadataMarshaller

protected org.apache.activemq.store.kahadb.MessageDatabase.MetadataMarshaller metadataMarshaller

failIfDatabaseIsLocked

protected boolean failIfDatabaseIsLocked

deleteAllMessages

protected boolean deleteAllMessages

directory

protected File directory

checkpointThread

protected Thread checkpointThread

enableJournalDiskSyncs

protected boolean enableJournalDiskSyncs

archiveDataLogs

protected boolean archiveDataLogs

directoryArchive

protected File directoryArchive

journalSize

protected AtomicLong journalSize

opened

protected AtomicBoolean opened

forceRecoverIndex

protected boolean forceRecoverIndex

indexLock

protected final ReentrantReadWriteLock indexLock

preparedTransactions

protected final LinkedHashMap<TransactionId,List<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> preparedTransactions

ackedAndPrepared

protected final Set<String> ackedAndPrepared
Constructor Detail

MessageDatabase

public MessageDatabase()
Method Detail

doStart

public void doStart()
             throws Exception
Specified by:
doStart in class ServiceSupport
Throws:
Exception

doStop

public void doStop(ServiceStopper stopper)
            throws Exception
Specified by:
doStop in class ServiceSupport
Throws:
Exception

open

public void open()
          throws IOException
Throws:
IOException

load

public void load()
          throws IOException
Throws:
IOException

close

public void close()
           throws IOException,
                  InterruptedException
Throws:
IOException
InterruptedException

unload

public void unload()
            throws IOException,
                   InterruptedException
Throws:
IOException
InterruptedException

getFirstInProgressTxLocation

public org.apache.kahadb.journal.Location getFirstInProgressTxLocation()

recoverIndex

protected void recoverIndex(org.apache.kahadb.page.Transaction tx)
                     throws IOException
Throws:
IOException

incrementalRecover

public void incrementalRecover()
                        throws IOException
Throws:
IOException

getLastUpdatePosition

public org.apache.kahadb.journal.Location getLastUpdatePosition()
                                                         throws IOException
Throws:
IOException

checkpointCleanup

protected void checkpointCleanup(boolean cleanup)
                          throws IOException
Throws:
IOException

checkpoint

public void checkpoint(Callback closure)
                throws Exception
Throws:
Exception

toByteSequence

public org.apache.kahadb.util.ByteSequence toByteSequence(JournalCommand<?> data)
                                                   throws IOException
Throws:
IOException

store

public org.apache.kahadb.journal.Location store(JournalCommand<?> data)
                                         throws IOException
Throws:
IOException

store

public org.apache.kahadb.journal.Location store(JournalCommand<?> data,
                                                Runnable onJournalStoreComplete)
                                         throws IOException
Throws:
IOException

store

public org.apache.kahadb.journal.Location store(JournalCommand<?> data,
                                                boolean sync,
                                                Runnable before,
                                                Runnable after)
                                         throws IOException
Throws:
IOException

store

public org.apache.kahadb.journal.Location store(JournalCommand<?> data,
                                                boolean sync,
                                                Runnable before,
                                                Runnable after,
                                                Runnable onJournalStoreComplete)
                                         throws IOException
All updated are are funneled through this method. The updates are converted to a JournalMessage which is logged to the journal and then the data from the JournalMessage is used to update the index just like it would be done during a recovery process.

Throws:
IOException

load

public JournalCommand<?> load(org.apache.kahadb.journal.Location location)
                       throws IOException
Loads a previously stored JournalMessage

Parameters:
location -
Returns:
Throws:
IOException

process

protected void process(org.apache.activemq.store.kahadb.data.KahaAddMessageCommand command,
                       org.apache.kahadb.journal.Location location)
                throws IOException
Throws:
IOException

process

protected void process(org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand command,
                       org.apache.kahadb.journal.Location location)
                throws IOException
Throws:
IOException

process

protected void process(org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand command,
                       org.apache.kahadb.journal.Location location)
                throws IOException
Throws:
IOException

process

protected void process(org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand command,
                       org.apache.kahadb.journal.Location location)
                throws IOException
Throws:
IOException

processLocation

protected void processLocation(org.apache.kahadb.journal.Location location)

process

protected void process(org.apache.activemq.store.kahadb.data.KahaCommitCommand command,
                       org.apache.kahadb.journal.Location location,
                       Runnable after)
                throws IOException
Throws:
IOException

process

protected void process(org.apache.activemq.store.kahadb.data.KahaPrepareCommand command,
                       org.apache.kahadb.journal.Location location)

process

protected void process(org.apache.activemq.store.kahadb.data.KahaRollbackCommand command,
                       org.apache.kahadb.journal.Location location)
                throws IOException
Throws:
IOException

getJournalFilesBeingReplicated

public HashSet<Integer> getJournalFilesBeingReplicated()

getStoredDestination

protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getStoredDestination(org.apache.activemq.store.kahadb.data.KahaDestination destination,
                                                                                                  org.apache.kahadb.page.Transaction tx)
                                                                                           throws IOException
Throws:
IOException

getExistingStoredDestination

protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getExistingStoredDestination(org.apache.activemq.store.kahadb.data.KahaDestination destination,
                                                                                                          org.apache.kahadb.page.Transaction tx)
                                                                                                   throws IOException
Throws:
IOException

getLastAck

public org.apache.activemq.store.kahadb.MessageDatabase.LastAck getLastAck(org.apache.kahadb.page.Transaction tx,
                                                                           org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
                                                                           String subscriptionKey)
                                                                    throws IOException
Throws:
IOException

getStoredMessageCount

public long getStoredMessageCount(org.apache.kahadb.page.Transaction tx,
                                  org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
                                  String subscriptionKey)
                           throws IOException
Throws:
IOException

trackRecoveredAcks

public void trackRecoveredAcks(ArrayList<MessageAck> acks)

forgetRecoveredAcks

public void forgetRecoveredAcks(ArrayList<MessageAck> acks)
                         throws IOException
Throws:
IOException

getJournalMaxWriteBatchSize

public int getJournalMaxWriteBatchSize()

setJournalMaxWriteBatchSize

public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)

getDirectory

public File getDirectory()

setDirectory

public void setDirectory(File directory)

isDeleteAllMessages

public boolean isDeleteAllMessages()

setDeleteAllMessages

public void setDeleteAllMessages(boolean deleteAllMessages)

setIndexWriteBatchSize

public void setIndexWriteBatchSize(int setIndexWriteBatchSize)

getIndexWriteBatchSize

public int getIndexWriteBatchSize()

setEnableIndexWriteAsync

public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)

isEnableJournalDiskSyncs

public boolean isEnableJournalDiskSyncs()

setEnableJournalDiskSyncs

public void setEnableJournalDiskSyncs(boolean syncWrites)

getCheckpointInterval

public long getCheckpointInterval()

setCheckpointInterval

public void setCheckpointInterval(long checkpointInterval)

getCleanupInterval

public long getCleanupInterval()

setCleanupInterval

public void setCleanupInterval(long cleanupInterval)

setJournalMaxFileLength

public void setJournalMaxFileLength(int journalMaxFileLength)

getJournalMaxFileLength

public int getJournalMaxFileLength()

setMaxFailoverProducersToTrack

public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack)

getMaxFailoverProducersToTrack

public int getMaxFailoverProducersToTrack()

setFailoverProducersAuditDepth

public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth)

getFailoverProducersAuditDepth

public int getFailoverProducersAuditDepth()

getPageFile

public org.apache.kahadb.page.PageFile getPageFile()

getJournal

public org.apache.kahadb.journal.Journal getJournal()
                                             throws IOException
Throws:
IOException

isFailIfDatabaseIsLocked

public boolean isFailIfDatabaseIsLocked()

setFailIfDatabaseIsLocked

public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked)

isIgnoreMissingJournalfiles

public boolean isIgnoreMissingJournalfiles()

setIgnoreMissingJournalfiles

public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)

getIndexCacheSize

public int getIndexCacheSize()

setIndexCacheSize

public void setIndexCacheSize(int indexCacheSize)

isCheckForCorruptJournalFiles

public boolean isCheckForCorruptJournalFiles()

setCheckForCorruptJournalFiles

public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)

isChecksumJournalFiles

public boolean isChecksumJournalFiles()

setChecksumJournalFiles

public void setChecksumJournalFiles(boolean checksumJournalFiles)

setBrokerService

public void setBrokerService(BrokerService brokerService)
Specified by:
setBrokerService in interface BrokerServiceAware

isArchiveDataLogs

public boolean isArchiveDataLogs()
Returns:
the archiveDataLogs

setArchiveDataLogs

public void setArchiveDataLogs(boolean archiveDataLogs)
Parameters:
archiveDataLogs - the archiveDataLogs to set

getDirectoryArchive

public File getDirectoryArchive()
Returns:
the directoryArchive

setDirectoryArchive

public void setDirectoryArchive(File directoryArchive)
Parameters:
directoryArchive - the directoryArchive to set

isRewriteOnRedelivery

public boolean isRewriteOnRedelivery()

setRewriteOnRedelivery

public void setRewriteOnRedelivery(boolean rewriteOnRedelivery)

isArchiveCorruptedIndex

public boolean isArchiveCorruptedIndex()

setArchiveCorruptedIndex

public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex)

getIndexLFUEvictionFactor

public float getIndexLFUEvictionFactor()

setIndexLFUEvictionFactor

public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor)

isUseIndexLFRUEviction

public boolean isUseIndexLFRUEviction()

setUseIndexLFRUEviction

public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction)

setEnableIndexDiskSyncs

public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs)

setEnableIndexRecoveryFile

public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile)

setEnableIndexPageCaching

public void setEnableIndexPageCaching(boolean enableIndexPageCaching)

isEnableIndexDiskSyncs

public boolean isEnableIndexDiskSyncs()

isEnableIndexRecoveryFile

public boolean isEnableIndexRecoveryFile()

isEnableIndexPageCaching

public boolean isEnableIndexPageCaching()


Copyright © 2005-2012 The Apache Software Foundation. All Rights Reserved.