org.apache.activemq.store.kahadb
Class KahaDBStore

java.lang.Object
  extended by org.apache.activemq.util.ServiceSupport
      extended by org.apache.activemq.store.kahadb.MessageDatabase
          extended by org.apache.activemq.store.kahadb.KahaDBStore
All Implemented Interfaces:
BrokerServiceAware, Service, PersistenceAdapter

public class KahaDBStore
extends MessageDatabase
implements PersistenceAdapter


Nested Class Summary
 class KahaDBStore.KahaDBMessageStore
           
static interface KahaDBStore.StoreTask
           
 class KahaDBStore.StoreTaskExecutor
           
 
Nested classes/interfaces inherited from class org.apache.activemq.store.kahadb.MessageDatabase
MessageDatabase.LastAckMarshaller, MessageDatabase.MessageKeysMarshaller, MessageDatabase.Metadata, MessageDatabase.StoredDestinationMarshaller
 
Field Summary
protected  List<Map<org.apache.activemq.store.kahadb.KahaDBStore.AsyncJobKey,KahaDBStore.StoreTask>> asyncQueueMaps
           
protected  List<Map<org.apache.activemq.store.kahadb.KahaDBStore.AsyncJobKey,KahaDBStore.StoreTask>> asyncTopicMaps
           
static int cancelledTaskModMetric
           
static String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS
           
static String PROPERTY_CANCELED_TASK_MOD_METRIC
           
protected  ExecutorService queueExecutor
           
protected  ExecutorService topicExecutor
           
 
Fields inherited from class org.apache.activemq.store.kahadb.MessageDatabase
ackedAndPrepared, archiveDataLogs, brokerService, checkpointThread, DEFAULT_DIRECTORY, deleteAllMessages, directory, directoryArchive, enableJournalDiskSyncs, failIfDatabaseIsLocked, forceRecoverIndex, indexLock, journal, journalSize, LOG_SLOW_ACCESS_TIME, metadata, metadataMarshaller, opened, pageFile, preparedTransactions, PROPERTY_LOG_SLOW_ACCESS_TIME, UNMATCHED
 
Constructor Summary
KahaDBStore()
           
 
Method Summary
protected  void addQueueTask(KahaDBStore.KahaDBMessageStore store, org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask task)
           
protected  void addTopicTask(org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore store, org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask task)
           
 void beginTransaction(ConnectionContext context)
          This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.
 void checkpoint(boolean sync)
          checkpoint any
 void commitTransaction(ConnectionContext context)
          Commit a persistence transaction
 MessageStore createQueueMessageStore(ActiveMQQueue destination)
          Factory method to create a new queue message store with the given destination name
 TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
          Factory method to create a new topic message store with the given destination name
 TransactionStore createTransactionStore()
          Factory method to create a new persistent prepared transaction store for XA recovery
 void deleteAllMessages()
          Delete's all the messages in the persistent store.
 void doStart()
           
 void doStop(ServiceStopper stopper)
           
 Set<ActiveMQDestination> getDestinations()
          Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.
 boolean getForceRecoverIndex()
           
 long getLastMessageBrokerSequenceId()
           
 long getLastProducerSequenceId(ProducerId id)
          return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs
 int getMaxAsyncJobs()
           
 TransactionIdTransformer getTransactionIdTransformer()
           
 SystemUsage getUsageManager()
           
 boolean isConcurrentStoreAndDispatchQueues()
           
 boolean isConcurrentStoreAndDispatchTopics()
           
 boolean isConcurrentStoreAndDispatchTransactions()
           
 void removeQueueMessageStore(ActiveMQQueue destination)
          Cleanup method to remove any state associated with the given destination.
protected  org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask removeQueueTask(KahaDBStore.KahaDBMessageStore store, MessageId id)
           
 void removeTopicMessageStore(ActiveMQTopic destination)
          Cleanup method to remove any state associated with the given destination This method does not stop the message store (it might not be cached).
protected  org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask removeTopicTask(org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore store, MessageId id)
           
 void rollbackTransaction(ConnectionContext context)
          Rollback a persistence transaction
 void setBrokerName(String brokerName)
          Set the name of the broker using the adapter
 void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch)
           
 void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch)
           
 void setForceRecoverIndex(boolean forceRecoverIndex)
           
 void setMaxAsyncJobs(int maxAsyncJobs)
           
 void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer)
           
 void setUsageManager(SystemUsage usageManager)
           
 long size()
          A hint to return the size of the store on disk
 String toString()
           
 
Methods inherited from class org.apache.activemq.store.kahadb.MessageDatabase
checkpointCleanup, close, forgetRecoveredAcks, getCheckpointInterval, getCleanupInterval, getDirectory, getDirectoryArchive, getExistingStoredDestination, getFailoverProducersAuditDepth, getIndexCacheSize, getIndexLFUEvictionFactor, getIndexWriteBatchSize, getInProgressTxLocationRange, getJournal, getJournalFilesBeingReplicated, getJournalMaxFileLength, getJournalMaxWriteBatchSize, getLastAck, getLastUpdatePosition, getMaxFailoverProducersToTrack, getPageFile, getStoredDestination, getStoredMessageCount, getTransactions, incrementalRecover, isArchiveCorruptedIndex, isArchiveDataLogs, isCheckForCorruptJournalFiles, isChecksumJournalFiles, isDeleteAllMessages, isEnableIndexDiskSyncs, isEnableIndexPageCaching, isEnableIndexRecoveryFile, isEnableJournalDiskSyncs, isFailIfDatabaseIsLocked, isIgnoreMissingJournalfiles, isRewriteOnRedelivery, isUseIndexLFRUEviction, load, load, open, process, process, process, process, process, process, process, processLocation, recoverIndex, setArchiveCorruptedIndex, setArchiveDataLogs, setBrokerService, setCheckForCorruptJournalFiles, setCheckpointInterval, setChecksumJournalFiles, setCleanupInterval, setDeleteAllMessages, setDirectory, setDirectoryArchive, setEnableIndexDiskSyncs, setEnableIndexPageCaching, setEnableIndexRecoveryFile, setEnableIndexWriteAsync, setEnableJournalDiskSyncs, setFailIfDatabaseIsLocked, setFailoverProducersAuditDepth, setIgnoreMissingJournalfiles, setIndexCacheSize, setIndexLFUEvictionFactor, setIndexWriteBatchSize, setJournalMaxFileLength, setJournalMaxWriteBatchSize, setMaxFailoverProducersToTrack, setRewriteOnRedelivery, setUseIndexLFRUEviction, store, store, store, store, toByteSequence, trackRecoveredAcks, unload
 
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, postStop, preStart, removeServiceListener, start, stop
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface org.apache.activemq.store.PersistenceAdapter
getDirectory, setDirectory
 
Methods inherited from interface org.apache.activemq.Service
start, stop
 

Field Detail

PROPERTY_CANCELED_TASK_MOD_METRIC

public static final String PROPERTY_CANCELED_TASK_MOD_METRIC
See Also:
Constant Field Values

cancelledTaskModMetric

public static final int cancelledTaskModMetric

PROPERTY_ASYNC_EXECUTOR_MAX_THREADS

public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS
See Also:
Constant Field Values

queueExecutor

protected ExecutorService queueExecutor

topicExecutor

protected ExecutorService topicExecutor

asyncQueueMaps

protected final List<Map<org.apache.activemq.store.kahadb.KahaDBStore.AsyncJobKey,KahaDBStore.StoreTask>> asyncQueueMaps

asyncTopicMaps

protected final List<Map<org.apache.activemq.store.kahadb.KahaDBStore.AsyncJobKey,KahaDBStore.StoreTask>> asyncTopicMaps
Constructor Detail

KahaDBStore

public KahaDBStore()
Method Detail

toString

public String toString()
Overrides:
toString in class Object

setBrokerName

public void setBrokerName(String brokerName)
Description copied from interface: PersistenceAdapter
Set the name of the broker using the adapter

Specified by:
setBrokerName in interface PersistenceAdapter

setUsageManager

public void setUsageManager(SystemUsage usageManager)
Specified by:
setUsageManager in interface PersistenceAdapter
Parameters:
usageManager - The UsageManager that is controlling the broker's memory usage.

getUsageManager

public SystemUsage getUsageManager()

isConcurrentStoreAndDispatchQueues

public boolean isConcurrentStoreAndDispatchQueues()
Returns:
the concurrentStoreAndDispatch

setConcurrentStoreAndDispatchQueues

public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch)
Parameters:
concurrentStoreAndDispatch - the concurrentStoreAndDispatch to set

isConcurrentStoreAndDispatchTopics

public boolean isConcurrentStoreAndDispatchTopics()
Returns:
the concurrentStoreAndDispatch

setConcurrentStoreAndDispatchTopics

public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch)
Parameters:
concurrentStoreAndDispatch - the concurrentStoreAndDispatch to set

isConcurrentStoreAndDispatchTransactions

public boolean isConcurrentStoreAndDispatchTransactions()

getMaxAsyncJobs

public int getMaxAsyncJobs()
Returns:
the maxAsyncJobs

setMaxAsyncJobs

public void setMaxAsyncJobs(int maxAsyncJobs)
Parameters:
maxAsyncJobs - the maxAsyncJobs to set

doStart

public void doStart()
             throws Exception
Overrides:
doStart in class MessageDatabase
Throws:
Exception

doStop

public void doStop(ServiceStopper stopper)
            throws Exception
Overrides:
doStop in class MessageDatabase
Throws:
Exception

removeQueueTask

protected org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask removeQueueTask(KahaDBStore.KahaDBMessageStore store,
                                                                                      MessageId id)

addQueueTask

protected void addQueueTask(KahaDBStore.KahaDBMessageStore store,
                            org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask task)
                     throws IOException
Throws:
IOException

removeTopicTask

protected org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask removeTopicTask(org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore store,
                                                                                      MessageId id)

addTopicTask

protected void addTopicTask(org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore store,
                            org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask task)
                     throws IOException
Throws:
IOException

createTransactionStore

public TransactionStore createTransactionStore()
                                        throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new persistent prepared transaction store for XA recovery

Specified by:
createTransactionStore in interface PersistenceAdapter
Returns:
transaction store
Throws:
IOException

getForceRecoverIndex

public boolean getForceRecoverIndex()

setForceRecoverIndex

public void setForceRecoverIndex(boolean forceRecoverIndex)

createQueueMessageStore

public MessageStore createQueueMessageStore(ActiveMQQueue destination)
                                     throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new queue message store with the given destination name

Specified by:
createQueueMessageStore in interface PersistenceAdapter
Returns:
the message store
Throws:
IOException

createTopicMessageStore

public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
                                          throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new topic message store with the given destination name

Specified by:
createTopicMessageStore in interface PersistenceAdapter
Returns:
the topic message store
Throws:
IOException

removeQueueMessageStore

public void removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destination. This method does not stop the message store (it might not be cached).

Specified by:
removeQueueMessageStore in interface PersistenceAdapter
Parameters:
destination - Destination to forget

removeTopicMessageStore

public void removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination This method does not stop the message store (it might not be cached).

Specified by:
removeTopicMessageStore in interface PersistenceAdapter
Parameters:
destination - Destination to forget

deleteAllMessages

public void deleteAllMessages()
                       throws IOException
Description copied from interface: PersistenceAdapter
Delete's all the messages in the persistent store.

Specified by:
deleteAllMessages in interface PersistenceAdapter
Throws:
IOException

getDestinations

public Set<ActiveMQDestination> getDestinations()
Description copied from interface: PersistenceAdapter
Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.

Specified by:
getDestinations in interface PersistenceAdapter
Returns:
active destinations

getLastMessageBrokerSequenceId

public long getLastMessageBrokerSequenceId()
                                    throws IOException
Specified by:
getLastMessageBrokerSequenceId in interface PersistenceAdapter
Returns:
last broker sequence
Throws:
IOException

getLastProducerSequenceId

public long getLastProducerSequenceId(ProducerId id)
Description copied from interface: PersistenceAdapter
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs

Specified by:
getLastProducerSequenceId in interface PersistenceAdapter
Parameters:
id - the producerId to find a sequenceId for
Returns:
the last stored sequence id or -1 if no suppression needed

size

public long size()
Description copied from interface: PersistenceAdapter
A hint to return the size of the store on disk

Specified by:
size in interface PersistenceAdapter
Returns:
disk space used in bytes of 0 if not implemented

beginTransaction

public void beginTransaction(ConnectionContext context)
                      throws IOException
Description copied from interface: PersistenceAdapter
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.

Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.

Specified by:
beginTransaction in interface PersistenceAdapter
Throws:
IOException

commitTransaction

public void commitTransaction(ConnectionContext context)
                       throws IOException
Description copied from interface: PersistenceAdapter
Commit a persistence transaction

Specified by:
commitTransaction in interface PersistenceAdapter
Throws:
IOException
See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)

rollbackTransaction

public void rollbackTransaction(ConnectionContext context)
                         throws IOException
Description copied from interface: PersistenceAdapter
Rollback a persistence transaction

Specified by:
rollbackTransaction in interface PersistenceAdapter
Throws:
IOException
See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)

checkpoint

public void checkpoint(boolean sync)
                throws IOException
Description copied from interface: PersistenceAdapter
checkpoint any

Specified by:
checkpoint in interface PersistenceAdapter
Throws:
IOException

getTransactionIdTransformer

public TransactionIdTransformer getTransactionIdTransformer()

setTransactionIdTransformer

public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer)


Copyright © 2005–2013 The Apache Software Foundation. All rights reserved.