Class JobSchedulerStoreImpl
- java.lang.Object
-
- org.apache.activemq.util.ServiceSupport
-
- org.apache.activemq.broker.LockableServiceSupport
-
- org.apache.activemq.store.kahadb.AbstractKahaDBStore
-
- org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl
-
- All Implemented Interfaces:
BrokerServiceAware
,Lockable
,JobSchedulerStore
,Service
public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore
-
-
Field Summary
-
Fields inherited from class org.apache.activemq.store.kahadb.AbstractKahaDBStore
archiveCorruptedIndex, archiveDataLogs, checkForCorruptJournalFiles, checkpointInterval, checkpointLock, checkpointThread, checkpointThreadLock, checksumJournalFiles, cleanupInterval, deleteAllJobs, directory, directoryArchive, enableIndexDiskSyncs, enableIndexPageCaching, enableIndexRecoveryFile, enableIndexWriteAsync, enableJournalDiskSyncs, failIfDatabaseIsLocked, forceRecoverIndex, ignoreMissingJournalfiles, indexCacheSize, indexLFUEvictionFactor, indexLock, indexWriteBatchSize, journal, journalMaxFileLength, journalMaxWriteBatchSize, journalSize, LOG_SLOW_ACCESS_TIME, opened, pageFile, PROPERTY_LOG_SLOW_ACCESS_TIME, purgeStoreOnStartup, useIndexLFRUEviction
-
Fields inherited from class org.apache.activemq.broker.LockableServiceSupport
brokerService, clockDaemon
-
-
Constructor Summary
Constructors Constructor Description JobSchedulerStoreImpl()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
checkpointUpdate(Transaction tx, boolean cleanup)
Perform the checkpoint update operation.protected void
decrementJournalCount(Transaction tx, HashMap<Integer,Integer> decrementsByFileIds)
Removes multiple references for the Journal log file indicated in the given Location map.protected void
decrementJournalCount(Transaction tx, Location location)
Removes one reference for the Journal log file indicated in the given Location value.protected void
doRecover(JournalCommand<?> data, Location location, Location inDoubtlocation)
Called during index recovery to rebuild the index from the last known good location.protected File
getDefaultDataDirectory()
JobScheduler
getJobScheduler(String name)
Returns the JobScheduler instance identified by the given name.File
getLegacyStoreArchiveDirectory()
Gets the directory where the legacy Scheduler Store files will be archived if the broker is started and an existing Job Scheduler Store from an old version is detected.protected String
getPageFileName()
protected ByteSequence
getPayload(Location location)
Retrieve the scheduled Job's byte blob from the journal.protected void
incrementJournalCount(Transaction tx, Location location)
Adds a reference for the journal log file pointed to by the given Location value.void
load()
Loads the store from disk.protected void
process(JournalCommand<?> data, Location location)
Called during recovery to allow the store to rebuild from scratch.protected void
processLocation(Location location)
void
readLockIndex()
void
readUnlockIndex()
protected void
referenceRemovedLocation(Transaction tx, Location location, List<Integer> removedJobsFileId)
Updates the Job removal tracking index with the location of a remove command and the original JobLocation entry.protected void
referenceRemovedLocation(Transaction tx, Location location, org.apache.activemq.store.kahadb.scheduler.JobLocation removedJob)
Updates the Job removal tracking index with the location of a remove command and the original JobLocation entry.boolean
removeJobScheduler(String name)
Removes the named JobScheduler if it exists, purging all scheduled messages assigned to it.void
setLegacyStoreArchiveDirectory(File directory)
Sets the directory where the legacy scheduler store files are archived before an update attempt is made.String
toString()
void
unload()
Unload the state of the Store to disk and shuts down all resources assigned to this KahaDB store implementation.void
writeLockIndex()
void
writeUnlockIndex()
-
Methods inherited from class org.apache.activemq.store.kahadb.AbstractKahaDBStore
checkpointCleanup, checkpointUpdate, createDefaultLocker, createJournal, createPageFile, doStart, doStop, getCheckpointInterval, getCleanupInterval, getCleanupOnStop, getDirectory, getDirectoryArchive, getIndexCacheSize, getIndexLFUEvictionFactor, getIndexWriteBatchSize, getJournal, getJournalMaxFileLength, getJournalMaxWriteBatchSize, getPageFile, init, isArchiveCorruptedIndex, isArchiveDataLogs, isCheckForCorruptJournalFiles, isChecksumJournalFiles, isDeleteAllJobs, isEnableIndexDiskSyncs, isEnableIndexPageCaching, isEnableIndexRecoveryFile, isEnableIndexWriteAsync, isEnableJournalDiskSyncs, isFailIfDatabaseIsLocked, isForceRecoverIndex, isIgnoreMissingJournalfiles, isPurgeStoreOnStartup, isUseIndexLFRUEviction, load, setArchiveCorruptedIndex, setArchiveDataLogs, setCheckForCorruptJournalFiles, setCheckpointInterval, setChecksumJournalFiles, setCleanupInterval, setCleanupOnStop, setDeleteAllJobs, setDirectory, setDirectoryArchive, setEnableIndexDiskSyncs, setEnableIndexPageCaching, setEnableIndexRecoveryFile, setEnableIndexWriteAsync, setEnableJournalDiskSyncs, setFailIfDatabaseIsLocked, setForceRecoverIndex, setIgnoreMissingJournalfiles, setIndexCacheSize, setIndexLFUEvictionFactor, setIndexWriteBatchSize, setJournalMaxFileLength, setJournalMaxWriteBatchSize, setPurgeStoreOnStartup, setUseIndexLFRUEviction, size, startCheckpoint, store, store, store, store, store, toByteSequence
-
Methods inherited from class org.apache.activemq.broker.LockableServiceSupport
getBrokerService, getLocker, getLockKeepAlivePeriod, getScheduledThreadPoolExecutor, isStopOnError, isUseLock, keepLockAlive, postStop, preStart, setBrokerService, setLocker, setLockKeepAlivePeriod, setScheduledThreadPoolExecutor, setStopOnError, setUseLock, stopBroker
-
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, removeServiceListener, start, stop
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.apache.activemq.broker.scheduler.JobSchedulerStore
getDirectory, setDirectory, size
-
-
-
-
Method Detail
-
getJobScheduler
public JobScheduler getJobScheduler(String name) throws Exception
Description copied from interface:JobSchedulerStore
Returns the JobScheduler instance identified by the given name.- Specified by:
getJobScheduler
in interfaceJobSchedulerStore
- Parameters:
name
- the name of the JobScheduler instance to lookup.- Returns:
- the named JobScheduler or null if none exists with the given name.
- Throws:
Exception
- if an error occurs while loading the named scheduler.
-
removeJobScheduler
public boolean removeJobScheduler(String name) throws Exception
Description copied from interface:JobSchedulerStore
Removes the named JobScheduler if it exists, purging all scheduled messages assigned to it.- Specified by:
removeJobScheduler
in interfaceJobSchedulerStore
- Parameters:
name
- the name of the scheduler instance to remove.- Returns:
- true if there was a scheduler with the given name to remove.
- Throws:
Exception
- if an error occurs while removing the scheduler.
-
setLegacyStoreArchiveDirectory
public void setLegacyStoreArchiveDirectory(File directory)
Sets the directory where the legacy scheduler store files are archived before an update attempt is made. Both the legacy index files and the journal files are moved to this folder prior to an upgrade attempt.- Parameters:
directory
- The directory to move the legacy Scheduler Store files to.
-
getLegacyStoreArchiveDirectory
public File getLegacyStoreArchiveDirectory()
Gets the directory where the legacy Scheduler Store files will be archived if the broker is started and an existing Job Scheduler Store from an old version is detected.- Returns:
- the directory where scheduler store legacy files are archived on upgrade.
-
load
public void load() throws IOException
Description copied from class:AbstractKahaDBStore
Loads the store from disk. Based on configuration this method can either load an existing store or it can purge an existing store and start in a clean state.- Specified by:
load
in classAbstractKahaDBStore
- Throws:
IOException
- if an error occurs during the load.
-
unload
public void unload() throws IOException
Description copied from class:AbstractKahaDBStore
Unload the state of the Store to disk and shuts down all resources assigned to this KahaDB store implementation.- Specified by:
unload
in classAbstractKahaDBStore
- Throws:
IOException
- if an error occurs during the store unload.
-
checkpointUpdate
protected void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException
Description copied from class:AbstractKahaDBStore
Perform the checkpoint update operation. If the cleanup flag is true then the operation should also purge any unused Journal log files. This method must always be called with the checkpoint and index write locks held.- Specified by:
checkpointUpdate
in classAbstractKahaDBStore
- Parameters:
tx
- The TX under which to perform the checkpoint update.cleanup
- Should the checkpoint also do unused Journal file cleanup.- Throws:
IOException
- if an error occurs while performing the checkpoint.
-
incrementJournalCount
protected void incrementJournalCount(Transaction tx, Location location) throws IOException
Adds a reference for the journal log file pointed to by the given Location value. To prevent log files in the journal that still contain valid data that needs to be kept in order to allow for recovery the logs must have active references. Each Job scheduler should ensure that the logs are accurately referenced.- Parameters:
tx
- The TX under which the update is to be performed.location
- The location value to update the reference count of.- Throws:
IOException
- if an error occurs while updating the journal references table.
-
decrementJournalCount
protected void decrementJournalCount(Transaction tx, Location location) throws IOException
Removes one reference for the Journal log file indicated in the given Location value. The references are used to track which log files cannot be GC'd. When the reference count on a log file reaches zero the file id is removed from the tracker and the log will be removed on the next check point update.- Parameters:
tx
- The TX under which the update is to be performed.location
- The location value to update the reference count of.- Throws:
IOException
- if an error occurs while updating the journal references table.
-
decrementJournalCount
protected void decrementJournalCount(Transaction tx, HashMap<Integer,Integer> decrementsByFileIds) throws IOException
Removes multiple references for the Journal log file indicated in the given Location map. The references are used to track which log files cannot be GC'd. When the reference count on a log file reaches zero the file id is removed from the tracker and the log will be removed on the next check point update.- Parameters:
tx
- The TX under which the update is to be performed.decrementsByFileIds
- Map indicating how many decrements per fileId.- Throws:
IOException
- if an error occurs while updating the journal references table.
-
referenceRemovedLocation
protected void referenceRemovedLocation(Transaction tx, Location location, org.apache.activemq.store.kahadb.scheduler.JobLocation removedJob) throws IOException
Updates the Job removal tracking index with the location of a remove command and the original JobLocation entry. The JobLocation holds the locations in the logs where the add and update commands for a job stored. The log file containing the remove command can only be discarded after both the add and latest update log files have also been discarded.- Parameters:
tx
- The TX under which the update is to be performed.location
- The location value to reference a remove command.removedJob
- The original JobLocation instance that holds the add and update locations- Throws:
IOException
- if an error occurs while updating the remove location tracker.
-
referenceRemovedLocation
protected void referenceRemovedLocation(Transaction tx, Location location, List<Integer> removedJobsFileId) throws IOException
Updates the Job removal tracking index with the location of a remove command and the original JobLocation entry. The JobLocation holds the locations in the logs where the add and update commands for a job stored. The log file containing the remove command can only be discarded after both the add and latest update log files have also been discarded.- Parameters:
tx
- The TX under which the update is to be performed.location
- The location value to reference a remove command.removedJobsFileId
- List of the original JobLocation instances that holds the add and update locations- Throws:
IOException
- if an error occurs while updating the remove location tracker.
-
getPayload
protected ByteSequence getPayload(Location location) throws IOException
Retrieve the scheduled Job's byte blob from the journal.- Parameters:
location
- The location of the KahaAddScheduledJobCommand that originated the Job.- Returns:
- a ByteSequence containing the payload of the scheduled Job.
- Throws:
IOException
- if an error occurs while reading the payload value.
-
readLockIndex
public void readLockIndex()
-
readUnlockIndex
public void readUnlockIndex()
-
writeLockIndex
public void writeLockIndex()
-
writeUnlockIndex
public void writeUnlockIndex()
-
getPageFileName
protected String getPageFileName()
- Specified by:
getPageFileName
in classAbstractKahaDBStore
- Returns:
- the name to give this store's PageFile instance.
-
getDefaultDataDirectory
protected File getDefaultDataDirectory()
- Specified by:
getDefaultDataDirectory
in classAbstractKahaDBStore
- Returns:
- the location of the data directory if no set by configuration.
-
doRecover
protected void doRecover(JournalCommand<?> data, Location location, Location inDoubtlocation) throws IOException
Called during index recovery to rebuild the index from the last known good location. For entries that occur before the last known good position we just ignore then and move on.- Parameters:
data
- the command read from the Journal which should be used to update the index.location
- the location in the index where the command was read.inDoubtlocation
- the location in the index known to be the last time the index was valid.- Throws:
IOException
- if an error occurs while recovering the index.
-
process
protected void process(JournalCommand<?> data, Location location) throws IOException
Called during recovery to allow the store to rebuild from scratch.- Specified by:
process
in classAbstractKahaDBStore
- Parameters:
data
- The command to process, which was read from the Journal.location
- The location of the command in the Journal.- Throws:
IOException
- if an error occurs during command processing.
-
processLocation
protected void processLocation(Location location)
-
-