Package org.apache.activemq.store.kahadb
Class AbstractKahaDBStore
- java.lang.Object
-
- org.apache.activemq.util.ServiceSupport
-
- org.apache.activemq.broker.LockableServiceSupport
-
- org.apache.activemq.store.kahadb.AbstractKahaDBStore
-
- All Implemented Interfaces:
BrokerServiceAware
,Lockable
,Service
- Direct Known Subclasses:
JobSchedulerStoreImpl
public abstract class AbstractKahaDBStore extends LockableServiceSupport
-
-
Field Summary
-
Fields inherited from class org.apache.activemq.broker.LockableServiceSupport
brokerService, clockDaemon
-
-
Constructor Summary
Constructors Constructor Description AbstractKahaDBStore()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected void
checkpointCleanup(boolean cleanup)
Called from the worker thread to start a checkpoint.protected void
checkpointUpdate(boolean cleanup)
Perform a checkpoint operation with optional cleanup.protected abstract void
checkpointUpdate(Transaction tx, boolean cleanup)
Perform the checkpoint update operation.Locker
createDefaultLocker()
Create a default lockerprotected Journal
createJournal()
Create a new Journal instance and configure it using the currently set configuration options.protected PageFile
createPageFile()
Create the PageFile instance and configure it using the configuration options currently set.protected void
doStart()
protected void
doStop(ServiceStopper stopper)
long
getCheckpointInterval()
long
getCleanupInterval()
boolean
getCleanupOnStop()
protected abstract File
getDefaultDataDirectory()
File
getDirectory()
File
getDirectoryArchive()
int
getIndexCacheSize()
float
getIndexLFUEvictionFactor()
int
getIndexWriteBatchSize()
Journal
getJournal()
int
getJournalMaxFileLength()
int
getJournalMaxWriteBatchSize()
PageFile
getPageFile()
protected abstract String
getPageFileName()
void
init()
Initialize resources before lockingboolean
isArchiveCorruptedIndex()
boolean
isArchiveDataLogs()
boolean
isCheckForCorruptJournalFiles()
boolean
isChecksumJournalFiles()
boolean
isDeleteAllJobs()
boolean
isEnableIndexDiskSyncs()
boolean
isEnableIndexPageCaching()
boolean
isEnableIndexRecoveryFile()
boolean
isEnableIndexWriteAsync()
boolean
isEnableJournalDiskSyncs()
boolean
isFailIfDatabaseIsLocked()
boolean
isForceRecoverIndex()
boolean
isIgnoreMissingJournalfiles()
boolean
isPurgeStoreOnStartup()
boolean
isUseIndexLFRUEviction()
abstract void
load()
Loads the store from disk.protected JournalCommand<?>
load(Location location)
Loads a previously stored JournalMessageprotected abstract void
process(JournalCommand<?> command, Location location)
Process a stored or recovered JournalCommand instance and update the DB Index with the state changes that this command produces.void
setArchiveCorruptedIndex(boolean archiveCorruptedIndex)
void
setArchiveDataLogs(boolean archiveDataLogs)
void
setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)
void
setCheckpointInterval(long checkpointInterval)
void
setChecksumJournalFiles(boolean checksumJournalFiles)
void
setCleanupInterval(long cleanupInterval)
void
setCleanupOnStop(boolean cleanupOnStop)
void
setDeleteAllJobs(boolean deleteAllJobs)
void
setDirectory(File directory)
void
setDirectoryArchive(File directoryArchive)
void
setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs)
void
setEnableIndexPageCaching(boolean enableIndexPageCaching)
void
setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile)
void
setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
void
setEnableJournalDiskSyncs(boolean syncWrites)
void
setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked)
void
setForceRecoverIndex(boolean forceRecoverIndex)
void
setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
void
setIndexCacheSize(int indexCacheSize)
void
setIndexLFUEvictionFactor(float indexLFUEvictionFactor)
void
setIndexWriteBatchSize(int indexWriteBatchSize)
void
setJournalMaxFileLength(int journalMaxFileLength)
void
setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
void
setPurgeStoreOnStartup(boolean purge)
void
setUseIndexLFRUEviction(boolean useIndexLFRUEviction)
long
size()
protected void
startCheckpoint()
Starts the checkpoint Thread instance if not already running and not disabled by configuration.Location
store(JournalCommand<?> command)
Store a command in the Journal and process to update the Store index.Location
store(JournalCommand<?> command, boolean sync)
Store a command in the Journal and process to update the Store index.Location
store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after)
Store a command in the Journal and process to update the Store index.Location
store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete)
All updated are are funneled through this method.Location
store(JournalCommand<?> command, Runnable onJournalStoreComplete)
Store a command in the Journal and process to update the Store index.protected ByteSequence
toByteSequence(JournalCommand<?> data)
Creates a new ByteSequence that represents the marshaled form of the given Journal Command.abstract void
unload()
Unload the state of the Store to disk and shuts down all resources assigned to this KahaDB store implementation.-
Methods inherited from class org.apache.activemq.broker.LockableServiceSupport
getBrokerService, getLocker, getLockKeepAlivePeriod, getScheduledThreadPoolExecutor, isStopOnError, isUseLock, keepLockAlive, postStop, preStart, setBrokerService, setLocker, setLockKeepAlivePeriod, setScheduledThreadPoolExecutor, setStopOnError, setUseLock, stopBroker
-
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, removeServiceListener, start, stop
-
-
-
-
Field Detail
-
PROPERTY_LOG_SLOW_ACCESS_TIME
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME
- See Also:
- Constant Field Values
-
LOG_SLOW_ACCESS_TIME
public static final int LOG_SLOW_ACCESS_TIME
-
directory
protected File directory
-
pageFile
protected PageFile pageFile
-
journal
protected Journal journal
-
journalSize
protected AtomicLong journalSize
-
failIfDatabaseIsLocked
protected boolean failIfDatabaseIsLocked
-
checkpointInterval
protected long checkpointInterval
-
cleanupInterval
protected long cleanupInterval
-
checkForCorruptJournalFiles
protected boolean checkForCorruptJournalFiles
-
checksumJournalFiles
protected boolean checksumJournalFiles
-
forceRecoverIndex
protected boolean forceRecoverIndex
-
journalMaxFileLength
protected int journalMaxFileLength
-
journalMaxWriteBatchSize
protected int journalMaxWriteBatchSize
-
archiveCorruptedIndex
protected boolean archiveCorruptedIndex
-
enableIndexWriteAsync
protected boolean enableIndexWriteAsync
-
enableJournalDiskSyncs
protected boolean enableJournalDiskSyncs
-
deleteAllJobs
protected boolean deleteAllJobs
-
indexWriteBatchSize
protected int indexWriteBatchSize
-
useIndexLFRUEviction
protected boolean useIndexLFRUEviction
-
indexLFUEvictionFactor
protected float indexLFUEvictionFactor
-
ignoreMissingJournalfiles
protected boolean ignoreMissingJournalfiles
-
indexCacheSize
protected int indexCacheSize
-
enableIndexDiskSyncs
protected boolean enableIndexDiskSyncs
-
enableIndexRecoveryFile
protected boolean enableIndexRecoveryFile
-
enableIndexPageCaching
protected boolean enableIndexPageCaching
-
archiveDataLogs
protected boolean archiveDataLogs
-
purgeStoreOnStartup
protected boolean purgeStoreOnStartup
-
directoryArchive
protected File directoryArchive
-
opened
protected AtomicBoolean opened
-
checkpointThread
protected Thread checkpointThread
-
checkpointThreadLock
protected final Object checkpointThreadLock
-
checkpointLock
protected ReentrantReadWriteLock checkpointLock
-
indexLock
protected ReentrantReadWriteLock indexLock
-
-
Method Detail
-
getPageFileName
protected abstract String getPageFileName()
- Returns:
- the name to give this store's PageFile instance.
-
getDefaultDataDirectory
protected abstract File getDefaultDataDirectory()
- Returns:
- the location of the data directory if no set by configuration.
-
load
public abstract void load() throws IOException
Loads the store from disk. Based on configuration this method can either load an existing store or it can purge an existing store and start in a clean state.- Throws:
IOException
- if an error occurs during the load.
-
unload
public abstract void unload() throws IOException
Unload the state of the Store to disk and shuts down all resources assigned to this KahaDB store implementation.- Throws:
IOException
- if an error occurs during the store unload.
-
doStart
protected void doStart() throws Exception
- Specified by:
doStart
in classServiceSupport
- Throws:
Exception
-
doStop
protected void doStop(ServiceStopper stopper) throws Exception
- Specified by:
doStop
in classServiceSupport
- Throws:
Exception
-
getPageFile
public PageFile getPageFile()
-
getJournal
public Journal getJournal() throws IOException
- Throws:
IOException
-
getDirectory
public File getDirectory()
-
setDirectory
public void setDirectory(File directory)
-
isArchiveCorruptedIndex
public boolean isArchiveCorruptedIndex()
-
setArchiveCorruptedIndex
public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex)
-
isFailIfDatabaseIsLocked
public boolean isFailIfDatabaseIsLocked()
-
setFailIfDatabaseIsLocked
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked)
-
isCheckForCorruptJournalFiles
public boolean isCheckForCorruptJournalFiles()
-
setCheckForCorruptJournalFiles
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)
-
getCheckpointInterval
public long getCheckpointInterval()
-
setCheckpointInterval
public void setCheckpointInterval(long checkpointInterval)
-
getCleanupInterval
public long getCleanupInterval()
-
setCleanupInterval
public void setCleanupInterval(long cleanupInterval)
-
setCleanupOnStop
public void setCleanupOnStop(boolean cleanupOnStop)
-
getCleanupOnStop
public boolean getCleanupOnStop()
-
isChecksumJournalFiles
public boolean isChecksumJournalFiles()
-
setChecksumJournalFiles
public void setChecksumJournalFiles(boolean checksumJournalFiles)
-
isForceRecoverIndex
public boolean isForceRecoverIndex()
-
setForceRecoverIndex
public void setForceRecoverIndex(boolean forceRecoverIndex)
-
getJournalMaxFileLength
public int getJournalMaxFileLength()
-
setJournalMaxFileLength
public void setJournalMaxFileLength(int journalMaxFileLength)
-
getJournalMaxWriteBatchSize
public int getJournalMaxWriteBatchSize()
-
setJournalMaxWriteBatchSize
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
-
isEnableIndexWriteAsync
public boolean isEnableIndexWriteAsync()
-
setEnableIndexWriteAsync
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
-
isEnableJournalDiskSyncs
public boolean isEnableJournalDiskSyncs()
-
setEnableJournalDiskSyncs
public void setEnableJournalDiskSyncs(boolean syncWrites)
-
isDeleteAllJobs
public boolean isDeleteAllJobs()
-
setDeleteAllJobs
public void setDeleteAllJobs(boolean deleteAllJobs)
-
isArchiveDataLogs
public boolean isArchiveDataLogs()
- Returns:
- the archiveDataLogs
-
setArchiveDataLogs
public void setArchiveDataLogs(boolean archiveDataLogs)
- Parameters:
archiveDataLogs
- the archiveDataLogs to set
-
getDirectoryArchive
public File getDirectoryArchive()
- Returns:
- the directoryArchive
-
setDirectoryArchive
public void setDirectoryArchive(File directoryArchive)
- Parameters:
directoryArchive
- the directoryArchive to set
-
getIndexCacheSize
public int getIndexCacheSize()
-
setIndexCacheSize
public void setIndexCacheSize(int indexCacheSize)
-
getIndexWriteBatchSize
public int getIndexWriteBatchSize()
-
setIndexWriteBatchSize
public void setIndexWriteBatchSize(int indexWriteBatchSize)
-
isUseIndexLFRUEviction
public boolean isUseIndexLFRUEviction()
-
setUseIndexLFRUEviction
public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction)
-
getIndexLFUEvictionFactor
public float getIndexLFUEvictionFactor()
-
setIndexLFUEvictionFactor
public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor)
-
isEnableIndexDiskSyncs
public boolean isEnableIndexDiskSyncs()
-
setEnableIndexDiskSyncs
public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs)
-
isEnableIndexRecoveryFile
public boolean isEnableIndexRecoveryFile()
-
setEnableIndexRecoveryFile
public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile)
-
isEnableIndexPageCaching
public boolean isEnableIndexPageCaching()
-
setEnableIndexPageCaching
public void setEnableIndexPageCaching(boolean enableIndexPageCaching)
-
isPurgeStoreOnStartup
public boolean isPurgeStoreOnStartup()
-
setPurgeStoreOnStartup
public void setPurgeStoreOnStartup(boolean purge)
-
isIgnoreMissingJournalfiles
public boolean isIgnoreMissingJournalfiles()
-
setIgnoreMissingJournalfiles
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
-
size
public long size()
-
createDefaultLocker
public Locker createDefaultLocker() throws IOException
Description copied from interface:Lockable
Create a default locker- Returns:
- default locker
- Throws:
IOException
-
init
public void init() throws Exception
Description copied from class:LockableServiceSupport
Initialize resources before locking- Specified by:
init
in classLockableServiceSupport
- Throws:
Exception
-
store
public Location store(JournalCommand<?> command) throws IOException
Store a command in the Journal and process to update the Store index.- Parameters:
command
- The specific JournalCommand to store and process.- Throws:
IOException
- if an error occurs storing or processing the command.
-
store
public Location store(JournalCommand<?> command, boolean sync) throws IOException
Store a command in the Journal and process to update the Store index.- Parameters:
command
- The specific JournalCommand to store and process.sync
- Should the store operation be done synchronously. (ignored if completion passed).- Throws:
IOException
- if an error occurs storing or processing the command.
-
store
public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException
Store a command in the Journal and process to update the Store index.- Parameters:
command
- The specific JournalCommand to store and process.onJournalStoreComplete
- The Runnable to call when the Journal write operation completes.- Throws:
IOException
- if an error occurs storing or processing the command.
-
store
public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException
Store a command in the Journal and process to update the Store index.- Parameters:
command
- The specific JournalCommand to store and process.sync
- Should the store operation be done synchronously. (ignored if completion passed).before
- The Runnable instance to execute before performing the store and process operation.after
- The Runnable instance to execute after performing the store and process operation.- Throws:
IOException
- if an error occurs storing or processing the command.
-
store
public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException
All updated are are funneled through this method. The updates are converted to a JournalMessage which is logged to the journal and then the data from the JournalMessage is used to update the index just like it would be done during a recovery process.- Parameters:
command
- The specific JournalCommand to store and process.sync
- Should the store operation be done synchronously. (ignored if completion passed).before
- The Runnable instance to execute before performing the store and process operation.after
- The Runnable instance to execute after performing the store and process operation.onJournalStoreComplete
- Callback to be run when the journal write operation is complete.- Throws:
IOException
- if an error occurs storing or processing the command.
-
load
protected JournalCommand<?> load(Location location) throws IOException
Loads a previously stored JournalMessage- Parameters:
location
- The location of the journal command to read.- Returns:
- a new un-marshaled JournalCommand instance.
- Throws:
IOException
- if an error occurs reading the stored command.
-
process
protected abstract void process(JournalCommand<?> command, Location location) throws IOException
Process a stored or recovered JournalCommand instance and update the DB Index with the state changes that this command produces. This can be called either as a new DB operation or as a replay during recovery operations.- Parameters:
command
- The JournalCommand to process.location
- The location in the Journal where the command was written or read from.- Throws:
IOException
-
checkpointUpdate
protected void checkpointUpdate(boolean cleanup) throws IOException
Perform a checkpoint operation with optional cleanup. Called by the checkpoint background thread periodically to initiate a checkpoint operation and if the cleanup flag is set a cleanup sweep should be done to allow for release of no longer needed journal log files etc.- Parameters:
cleanup
- Should the method do a simple checkpoint or also perform a journal cleanup.- Throws:
IOException
- if an error occurs during the checkpoint operation.
-
checkpointUpdate
protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException
Perform the checkpoint update operation. If the cleanup flag is true then the operation should also purge any unused Journal log files. This method must always be called with the checkpoint and index write locks held.- Parameters:
tx
- The TX under which to perform the checkpoint update.cleanup
- Should the checkpoint also do unused Journal file cleanup.- Throws:
IOException
- if an error occurs while performing the checkpoint.
-
toByteSequence
protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException
Creates a new ByteSequence that represents the marshaled form of the given Journal Command.- Parameters:
data
- The Journal Command that should be marshaled to bytes for writing.- Returns:
- the byte representation of the given journal command.
- Throws:
IOException
- if an error occurs while serializing the command.
-
createPageFile
protected PageFile createPageFile()
Create the PageFile instance and configure it using the configuration options currently set.- Returns:
- the newly created and configured PageFile instance.
-
createJournal
protected Journal createJournal() throws IOException
Create a new Journal instance and configure it using the currently set configuration options. If an archive directory is configured than this method will attempt to create that directory if it does not already exist.- Returns:
- the newly created an configured Journal instance.
- Throws:
IOException
- if an error occurs while creating the Journal object.
-
startCheckpoint
protected void startCheckpoint()
Starts the checkpoint Thread instance if not already running and not disabled by configuration.
-
checkpointCleanup
protected void checkpointCleanup(boolean cleanup) throws IOException
Called from the worker thread to start a checkpoint. This method ensure that the store is in an opened state and optionaly logs information related to slow store access times.- Parameters:
cleanup
- Should a cleanup of the journal occur during the checkpoint operation.- Throws:
IOException
- if an error occurs during the checkpoint operation.
-
-