Package org.apache.activemq.store.kahadb
Class KahaDBStore
- java.lang.Object
-
- org.apache.activemq.util.ServiceSupport
-
- org.apache.activemq.store.kahadb.MessageDatabase
-
- org.apache.activemq.store.kahadb.KahaDBStore
-
- All Implemented Interfaces:
BrokerServiceAware
,Service
,NoLocalSubscriptionAware
,PersistenceAdapter
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, NoLocalSubscriptionAware
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description class
KahaDBStore.KahaDBMessageStore
static interface
KahaDBStore.StoreTask
class
KahaDBStore.StoreTaskExecutor
-
Nested classes/interfaces inherited from class org.apache.activemq.store.kahadb.MessageDatabase
MessageDatabase.LastAckMarshaller, MessageDatabase.MessageKeysMarshaller, MessageDatabase.MessageStoreStatisticsMarshaller, MessageDatabase.Metadata, MessageDatabase.PurgeRecoveredXATransactionStrategy, MessageDatabase.StoredDestinationMarshaller
-
-
Field Summary
Fields Modifier and Type Field Description protected List<Map<org.apache.activemq.store.kahadb.KahaDBStore.AsyncJobKey,KahaDBStore.StoreTask>>
asyncQueueMaps
protected List<Map<org.apache.activemq.store.kahadb.KahaDBStore.AsyncJobKey,KahaDBStore.StoreTask>>
asyncTopicMaps
static int
cancelledTaskModMetric
static String
PROPERTY_ASYNC_EXECUTOR_MAX_THREADS
static String
PROPERTY_CANCELED_TASK_MOD_METRIC
protected ExecutorService
queueExecutor
protected ExecutorService
topicExecutor
-
Fields inherited from class org.apache.activemq.store.kahadb.MessageDatabase
archiveDataLogs, brokerService, DEFAULT_DIRECTORY, deleteAllMessages, directory, directoryArchive, failIfDatabaseIsLocked, forceRecoverIndex, indexDirectory, indexLock, journal, journalDiskSyncStrategy, journalSize, lastAsyncJournalUpdate, LOG_SLOW_ACCESS_TIME, metadata, metadataMarshaller, opened, pageFile, persistenceAdapterStatistics, preparedTransactions, PROPERTY_LOG_SLOW_ACCESS_TIME, purgeRecoveredXATransactionStrategy, scheduler, storeCache, storedDestinations, UNMATCHED
-
-
Constructor Summary
Constructors Constructor Description KahaDBStore()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
addQueueTask(KahaDBStore.KahaDBMessageStore store, org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask task)
protected void
addTopicTask(org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore store, org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask task)
void
beginTransaction(ConnectionContext context)
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.void
checkpoint(boolean sync)
checkpoint anyvoid
commitTransaction(ConnectionContext context)
Commit a persistence transactionprotected void
configureMetadata()
JobSchedulerStore
createJobSchedulerStore()
Creates and returns a new Job Scheduler store instance.MessageStore
createQueueMessageStore(ActiveMQQueue destination)
Factory method to create a new queue message store with the given destination nameTopicMessageStore
createTopicMessageStore(ActiveMQTopic destination)
Factory method to create a new topic message store with the given destination nameTransactionStore
createTransactionStore()
Factory method to create a new persistent prepared transaction store for XA recoveryvoid
deleteAllMessages()
Delete's all the messages in the persistent store.void
doStart()
void
doStop(ServiceStopper stopper)
void
forgetRecoveredAcks(ArrayList<MessageAck> preparedAcks, boolean isRollback)
Set<ActiveMQDestination>
getDestinations()
Returns a set of all theActiveMQDestination
objects that the persistence store is aware exist.boolean
getForceRecoverIndex()
long
getLastMessageBrokerSequenceId()
long
getLastProducerSequenceId(ProducerId id)
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occursint
getMaxAsyncJobs()
TransactionIdTransformer
getTransactionIdTransformer()
SystemUsage
getUsageManager()
boolean
isConcurrentStoreAndDispatchQueues()
boolean
isConcurrentStoreAndDispatchTopics()
boolean
isConcurrentStoreAndDispatchTransactions()
boolean
isPersistNoLocal()
void
removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destination.protected org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask
removeQueueTask(KahaDBStore.KahaDBMessageStore store, MessageId id)
void
removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination This method does not stop the message store (it might not be cached).protected org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask
removeTopicTask(org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore store, MessageId id)
void
rollbackTransaction(ConnectionContext context)
Rollback a persistence transactionvoid
setBrokerName(String brokerName)
Set the name of the broker using the adaptervoid
setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch)
void
setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch)
void
setForceRecoverIndex(boolean forceRecoverIndex)
void
setMaxAsyncJobs(int maxAsyncJobs)
void
setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer)
void
setUsageManager(SystemUsage usageManager)
long
size()
A hint to return the size of the store on diskString
toString()
void
trackRecoveredAcks(ArrayList<MessageAck> preparedAcks)
-
Methods inherited from class org.apache.activemq.store.kahadb.MessageDatabase
allowIOResumption, checkpointCleanup, clearStoreStats, close, createJournal, decrementAndSubSizeToStoreStat, decrementAndSubSizeToStoreStat, decrementAndSubSizeToStoreStat, decrementAndSubSizeToStoreStat, getCheckpointInterval, getCleanupInterval, getCleanupOnStop, getCompactAcksAfterNoGC, getDirectory, getDirectoryArchive, getExistingStoredDestination, getFailoverProducersAuditDepth, getIndexCacheSize, getIndexDirectory, getIndexLFUEvictionFactor, getIndexWriteBatchSize, getInProgressTxLocationRange, getJournal, getJournalDiskSyncInterval, getJournalDiskSyncStrategy, getJournalDiskSyncStrategyEnum, getJournalFilesBeingReplicated, getJournalMaxFileLength, getJournalMaxWriteBatchSize, getLastAck, getLastUpdatePosition, getMaxFailoverProducersToTrack, getMetadata, getPageFile, getPersistenceAdapterStatistics, getPreallocationScope, getPreallocationStrategy, getPreparedTransaction, getPurgeRecoveredXATransactionStrategy, getPurgeRecoveredXATransactionStrategyEnum, getSequenceSet, getStoredDestination, getStoredMessageCount, getStoredMessageSize, getStoredMessageSize, getStoredMessageStoreStatistics, getStoreStats, getSubStats, getTransactions, incrementalRecover, incrementAndAddSizeToStoreStat, incrementAndAddSizeToStoreStat, incrementAndAddSizeToStoreStat, incrementAndAddSizeToStoreStat, isArchiveCorruptedIndex, isArchiveDataLogs, isCheckForCorruptJournalFiles, isChecksumJournalFiles, isCompactAcksIgnoresStoreGrowth, isDeleteAllMessages, isEnableAckCompaction, isEnableIndexDiskSyncs, isEnableIndexPageCaching, isEnableIndexRecoveryFile, isEnableJournalDiskSyncs, isEnableSubscriptionStatistics, isFailIfDatabaseIsLocked, isIgnoreMissingJournalfiles, isUseIndexLFRUEviction, key, load, load, matchType, open, process, process, process, process, process, process, process, process, process, processLocation, recoverIndex, setArchiveCorruptedIndex, setArchiveDataLogs, setBrokerService, setCheckForCorruptJournalFiles, setCheckpointInterval, setChecksumJournalFiles, setCleanupInterval, setCleanupOnStop, setCompactAcksAfterNoGC, setCompactAcksIgnoresStoreGrowth, setDeleteAllMessages, setDirectory, setDirectoryArchive, setEnableAckCompaction, setEnableIndexDiskSyncs, setEnableIndexPageCaching, setEnableIndexRecoveryFile, setEnableIndexWriteAsync, setEnableJournalDiskSyncs, setEnableSubscriptionStatistics, setFailIfDatabaseIsLocked, setFailoverProducersAuditDepth, setIgnoreMissingJournalfiles, setIndexCacheSize, setIndexDirectory, setIndexLFUEvictionFactor, setIndexWriteBatchSize, setJournalDiskSyncInterval, setJournalDiskSyncStrategy, setJournalMaxFileLength, setJournalMaxWriteBatchSize, setMaxFailoverProducersToTrack, setPreallocationScope, setPreallocationStrategy, setPurgeRecoveredXATransactionStrategy, setUseIndexLFRUEviction, store, store, store, store, toByteSequence, unload
-
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, postStop, preStart, removeServiceListener, start, stop
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.apache.activemq.store.PersistenceAdapter
allowIOResumption, getDirectory, setDirectory
-
-
-
-
Field Detail
-
PROPERTY_CANCELED_TASK_MOD_METRIC
public static final String PROPERTY_CANCELED_TASK_MOD_METRIC
- See Also:
- Constant Field Values
-
cancelledTaskModMetric
public static final int cancelledTaskModMetric
-
PROPERTY_ASYNC_EXECUTOR_MAX_THREADS
public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS
- See Also:
- Constant Field Values
-
queueExecutor
protected ExecutorService queueExecutor
-
topicExecutor
protected ExecutorService topicExecutor
-
asyncQueueMaps
protected final List<Map<org.apache.activemq.store.kahadb.KahaDBStore.AsyncJobKey,KahaDBStore.StoreTask>> asyncQueueMaps
-
asyncTopicMaps
protected final List<Map<org.apache.activemq.store.kahadb.KahaDBStore.AsyncJobKey,KahaDBStore.StoreTask>> asyncTopicMaps
-
-
Method Detail
-
setBrokerName
public void setBrokerName(String brokerName)
Description copied from interface:PersistenceAdapter
Set the name of the broker using the adapter- Specified by:
setBrokerName
in interfacePersistenceAdapter
-
setUsageManager
public void setUsageManager(SystemUsage usageManager)
- Specified by:
setUsageManager
in interfacePersistenceAdapter
- Parameters:
usageManager
- The UsageManager that is controlling the broker's memory usage.
-
getUsageManager
public SystemUsage getUsageManager()
-
isConcurrentStoreAndDispatchQueues
public boolean isConcurrentStoreAndDispatchQueues()
- Returns:
- the concurrentStoreAndDispatch
-
setConcurrentStoreAndDispatchQueues
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch)
- Parameters:
concurrentStoreAndDispatch
- the concurrentStoreAndDispatch to set
-
isConcurrentStoreAndDispatchTopics
public boolean isConcurrentStoreAndDispatchTopics()
- Returns:
- the concurrentStoreAndDispatch
-
setConcurrentStoreAndDispatchTopics
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch)
- Parameters:
concurrentStoreAndDispatch
- the concurrentStoreAndDispatch to set
-
isConcurrentStoreAndDispatchTransactions
public boolean isConcurrentStoreAndDispatchTransactions()
-
getMaxAsyncJobs
public int getMaxAsyncJobs()
- Returns:
- the maxAsyncJobs
-
setMaxAsyncJobs
public void setMaxAsyncJobs(int maxAsyncJobs)
- Parameters:
maxAsyncJobs
- the maxAsyncJobs to set
-
configureMetadata
protected void configureMetadata()
- Specified by:
configureMetadata
in classMessageDatabase
-
doStart
public void doStart() throws Exception
- Overrides:
doStart
in classMessageDatabase
- Throws:
Exception
-
doStop
public void doStop(ServiceStopper stopper) throws Exception
- Overrides:
doStop
in classMessageDatabase
- Throws:
Exception
-
removeQueueTask
protected org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask removeQueueTask(KahaDBStore.KahaDBMessageStore store, MessageId id)
-
addQueueTask
protected void addQueueTask(KahaDBStore.KahaDBMessageStore store, org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask task) throws IOException
- Throws:
IOException
-
removeTopicTask
protected org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask removeTopicTask(org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore store, MessageId id)
-
addTopicTask
protected void addTopicTask(org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore store, org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask task) throws IOException
- Throws:
IOException
-
createTransactionStore
public TransactionStore createTransactionStore() throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new persistent prepared transaction store for XA recovery- Specified by:
createTransactionStore
in interfacePersistenceAdapter
- Returns:
- transaction store
- Throws:
IOException
-
getForceRecoverIndex
public boolean getForceRecoverIndex()
-
setForceRecoverIndex
public void setForceRecoverIndex(boolean forceRecoverIndex)
-
forgetRecoveredAcks
public void forgetRecoveredAcks(ArrayList<MessageAck> preparedAcks, boolean isRollback) throws IOException
- Throws:
IOException
-
trackRecoveredAcks
public void trackRecoveredAcks(ArrayList<MessageAck> preparedAcks) throws IOException
- Throws:
IOException
-
createQueueMessageStore
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new queue message store with the given destination name- Specified by:
createQueueMessageStore
in interfacePersistenceAdapter
- Returns:
- the message store
- Throws:
IOException
-
createTopicMessageStore
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new topic message store with the given destination name- Specified by:
createTopicMessageStore
in interfacePersistenceAdapter
- Returns:
- the topic message store
- Throws:
IOException
-
removeQueueMessageStore
public void removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destination. This method does not stop the message store (it might not be cached).- Specified by:
removeQueueMessageStore
in interfacePersistenceAdapter
- Parameters:
destination
- Destination to forget
-
removeTopicMessageStore
public void removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination This method does not stop the message store (it might not be cached).- Specified by:
removeTopicMessageStore
in interfacePersistenceAdapter
- Parameters:
destination
- Destination to forget
-
deleteAllMessages
public void deleteAllMessages() throws IOException
Description copied from interface:PersistenceAdapter
Delete's all the messages in the persistent store.- Specified by:
deleteAllMessages
in interfacePersistenceAdapter
- Throws:
IOException
-
getDestinations
public Set<ActiveMQDestination> getDestinations()
Description copied from interface:PersistenceAdapter
Returns a set of all theActiveMQDestination
objects that the persistence store is aware exist.- Specified by:
getDestinations
in interfacePersistenceAdapter
- Returns:
- active destinations
-
getLastMessageBrokerSequenceId
public long getLastMessageBrokerSequenceId() throws IOException
- Specified by:
getLastMessageBrokerSequenceId
in interfacePersistenceAdapter
- Returns:
- last broker sequence
- Throws:
IOException
-
getLastProducerSequenceId
public long getLastProducerSequenceId(ProducerId id)
Description copied from interface:PersistenceAdapter
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs- Specified by:
getLastProducerSequenceId
in interfacePersistenceAdapter
- Parameters:
id
- the producerId to find a sequenceId for- Returns:
- the last stored sequence id or -1 if no suppression needed
-
size
public long size()
Description copied from interface:PersistenceAdapter
A hint to return the size of the store on disk- Specified by:
size
in interfacePersistenceAdapter
- Returns:
- disk space used in bytes of 0 if not implemented
-
beginTransaction
public void beginTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization. Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.- Specified by:
beginTransaction
in interfacePersistenceAdapter
- Throws:
IOException
-
commitTransaction
public void commitTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
Commit a persistence transaction- Specified by:
commitTransaction
in interfacePersistenceAdapter
- Throws:
IOException
- See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)
-
rollbackTransaction
public void rollbackTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
Rollback a persistence transaction- Specified by:
rollbackTransaction
in interfacePersistenceAdapter
- Throws:
IOException
- See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)
-
checkpoint
public void checkpoint(boolean sync) throws IOException
Description copied from interface:PersistenceAdapter
checkpoint any- Specified by:
checkpoint
in interfacePersistenceAdapter
- Throws:
IOException
-
getTransactionIdTransformer
public TransactionIdTransformer getTransactionIdTransformer()
-
setTransactionIdTransformer
public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer)
-
createJobSchedulerStore
public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException
Description copied from interface:PersistenceAdapter
Creates and returns a new Job Scheduler store instance.- Specified by:
createJobSchedulerStore
in interfacePersistenceAdapter
- Returns:
- a new JobSchedulerStore instance if this Persistence adapter provides its own.
- Throws:
IOException
- If an error occurs while creating the new JobSchedulerStore.UnsupportedOperationException
- If this adapter does not provide its own scheduler store implementation.
-
isPersistNoLocal
public boolean isPersistNoLocal()
- Specified by:
isPersistNoLocal
in interfaceNoLocalSubscriptionAware
-
-