org.apache.activemq.store.kahadb
Class KahaDBPersistenceAdapter

java.lang.Object
  extended by org.apache.activemq.util.ServiceSupport
      extended by org.apache.activemq.broker.LockableServiceSupport
          extended by org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter
All Implemented Interfaces:
BrokerServiceAware, Lockable, Service, JournaledStore, PersistenceAdapter, TransactionIdTransformerAware

public class KahaDBPersistenceAdapter
extends LockableServiceSupport
implements PersistenceAdapter, JournaledStore, TransactionIdTransformerAware

An implementation of PersistenceAdapter designed for use with KahaDB - Embedded Lightweight Non-Relational Database


Field Summary
 
Fields inherited from class org.apache.activemq.broker.LockableServiceSupport
brokerService
 
Constructor Summary
KahaDBPersistenceAdapter()
           
 
Method Summary
 void beginTransaction(ConnectionContext context)
          This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.
 void checkpoint(boolean sync)
          checkpoint any
 void commitTransaction(ConnectionContext context)
          Commit a persistence transaction
 Locker createDefaultLocker()
          Create a default locker
 MessageStore createQueueMessageStore(ActiveMQQueue destination)
          Factory method to create a new queue message store with the given destination name
 TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
          Factory method to create a new topic message store with the given destination name
 KahaTransactionInfo createTransactionInfo(TransactionId txid)
           
 TransactionStore createTransactionStore()
          Factory method to create a new persistent prepared transaction store for XA recovery
 void deleteAllMessages()
          Delete's all the messages in the persistent store.
 void doStart()
           
 void doStop(ServiceStopper stopper)
           
 long getCheckpointInterval()
          Get the checkpointInterval
 long getCleanupInterval()
          Get the cleanupInterval
 Set<ActiveMQDestination> getDestinations()
          Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.
 File getDirectory()
          Get the directory
 File getDirectoryArchive()
           
 int getFailoverProducersAuditDepth()
           
 boolean getForceRecoverIndex()
           
 int getIndexCacheSize()
          Get the indexCacheSize
 float getIndexLFUEvictionFactor()
           
 int getIndexWriteBatchSize()
          Get the indexWriteBatchSize
 int getJournalMaxFileLength()
          Get the journalMaxFileLength
 int getJournalMaxWriteBatchSize()
          Get the journalMaxWriteBatchSize
 long getLastMessageBrokerSequenceId()
           
 long getLastProducerSequenceId(ProducerId id)
          return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs
 int getMaxAsyncJobs()
           
 int getMaxFailoverProducersToTrack()
           
 KahaDBStore getStore()
           
 void init()
          Initialize resources before locking
 boolean isArchiveCorruptedIndex()
           
 boolean isArchiveDataLogs()
           
 boolean isCheckForCorruptJournalFiles()
           
 boolean isChecksumJournalFiles()
           
 boolean isConcurrentStoreAndDispatchQueues()
           
 boolean isConcurrentStoreAndDispatchTopics()
           
 boolean isEnableIndexDiskSyncs()
           
 boolean isEnableIndexPageCaching()
           
 boolean isEnableIndexRecoveryFile()
           
 boolean isEnableIndexWriteAsync()
          Get the enableIndexWriteAsync
 boolean isEnableJournalDiskSyncs()
          Get the enableJournalDiskSyncs
 boolean isIgnoreMissingJournalfiles()
          Get the ignoreMissingJournalfiles
 boolean isRewriteOnRedelivery()
           
 boolean isUseIndexLFRUEviction()
           
 void removeQueueMessageStore(ActiveMQQueue destination)
          Cleanup method to remove any state associated with the given destination.
 void removeTopicMessageStore(ActiveMQTopic destination)
          Cleanup method to remove any state associated with the given destination This method does not stop the message store (it might not be cached).
 void rollbackTransaction(ConnectionContext context)
          Rollback a persistence transaction
 void setArchiveCorruptedIndex(boolean archiveCorruptedIndex)
           
 void setArchiveDataLogs(boolean archiveDataLogs)
           
 void setBrokerName(String brokerName)
          Set the name of the broker using the adapter
 void setBrokerService(BrokerService brokerService)
           
 void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)
           
 void setCheckpointInterval(long checkpointInterval)
          Set the checkpointInterval
 void setChecksumJournalFiles(boolean checksumJournalFiles)
           
 void setCleanupInterval(long cleanupInterval)
          Set the cleanupInterval
 void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch)
           
 void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch)
           
 void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay)
          Deprecated. use Locker.setLockAcquireSleepInterval(long) instead
 void setDirectory(File dir)
          Set the directory where any data files should be created
 void setDirectoryArchive(File directoryArchive)
           
 void setEnableIndexDiskSyncs(boolean diskSyncs)
           
 void setEnableIndexPageCaching(boolean enable)
           
 void setEnableIndexRecoveryFile(boolean enable)
           
 void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
          Set the enableIndexWriteAsync
 void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs)
          Set the enableJournalDiskSyncs
 void setFailoverProducersAuditDepth(int failoverProducersAuditDepth)
          set the audit window depth for duplicate suppression (should exceed the max transaction batch)
 void setForceRecoverIndex(boolean forceRecoverIndex)
           
 void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
          Set the ignoreMissingJournalfiles
 void setIndexCacheSize(int indexCacheSize)
          Set the indexCacheSize When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setIndexLFUEvictionFactor(float indexLFUEvictionFactor)
           
 void setIndexWriteBatchSize(int indexWriteBatchSize)
          Set the indexWriteBatchSize When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setJournalMaxFileLength(int journalMaxFileLength)
          When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
          Set the journalMaxWriteBatchSize * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setMaxAsyncJobs(int maxAsyncJobs)
           
 void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack)
          Set the max number of producers (LRU cache) to track for duplicate sends
 void setRewriteOnRedelivery(boolean rewriteOnRedelivery)
          When true, persist the redelivery status such that the message redelivery flag can survive a broker failure used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true
 void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer)
           
 void setUsageManager(SystemUsage usageManager)
           
 void setUseIndexLFRUEviction(boolean useIndexLFRUEviction)
           
 long size()
          A hint to return the size of the store on disk
 String toString()
           
 
Methods inherited from class org.apache.activemq.broker.LockableServiceSupport
getBrokerService, getLocker, getLockKeepAlivePeriod, getScheduledThreadPoolExecutor, isUseLock, keepLockAlive, postStop, preStart, setLocker, setLockKeepAlivePeriod, setUseLock, stopBroker
 
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, removeServiceListener, start, stop
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface org.apache.activemq.Service
start, stop
 

Constructor Detail

KahaDBPersistenceAdapter

public KahaDBPersistenceAdapter()
Method Detail

beginTransaction

public void beginTransaction(ConnectionContext context)
                      throws IOException
Description copied from interface: PersistenceAdapter
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.

Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.

Specified by:
beginTransaction in interface PersistenceAdapter
Parameters:
context -
Throws:
IOException
See Also:
PersistenceAdapter.beginTransaction(org.apache.activemq.broker.ConnectionContext)

checkpoint

public void checkpoint(boolean sync)
                throws IOException
Description copied from interface: PersistenceAdapter
checkpoint any

Specified by:
checkpoint in interface PersistenceAdapter
Parameters:
sync -
Throws:
IOException
See Also:
PersistenceAdapter.checkpoint(boolean)

commitTransaction

public void commitTransaction(ConnectionContext context)
                       throws IOException
Description copied from interface: PersistenceAdapter
Commit a persistence transaction

Specified by:
commitTransaction in interface PersistenceAdapter
Parameters:
context -
Throws:
IOException
See Also:
PersistenceAdapter.commitTransaction(org.apache.activemq.broker.ConnectionContext)

createQueueMessageStore

public MessageStore createQueueMessageStore(ActiveMQQueue destination)
                                     throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new queue message store with the given destination name

Specified by:
createQueueMessageStore in interface PersistenceAdapter
Parameters:
destination -
Returns:
MessageStore
Throws:
IOException
See Also:
PersistenceAdapter.createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)

createTopicMessageStore

public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
                                          throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new topic message store with the given destination name

Specified by:
createTopicMessageStore in interface PersistenceAdapter
Parameters:
destination -
Returns:
TopicMessageStore
Throws:
IOException
See Also:
PersistenceAdapter.createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)

createTransactionStore

public TransactionStore createTransactionStore()
                                        throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new persistent prepared transaction store for XA recovery

Specified by:
createTransactionStore in interface PersistenceAdapter
Returns:
TransactionStore
Throws:
IOException
See Also:
PersistenceAdapter.createTransactionStore()

deleteAllMessages

public void deleteAllMessages()
                       throws IOException
Description copied from interface: PersistenceAdapter
Delete's all the messages in the persistent store.

Specified by:
deleteAllMessages in interface PersistenceAdapter
Throws:
IOException
See Also:
PersistenceAdapter.deleteAllMessages()

getDestinations

public Set<ActiveMQDestination> getDestinations()
Description copied from interface: PersistenceAdapter
Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.

Specified by:
getDestinations in interface PersistenceAdapter
Returns:
destinations
See Also:
PersistenceAdapter.getDestinations()

getLastMessageBrokerSequenceId

public long getLastMessageBrokerSequenceId()
                                    throws IOException
Specified by:
getLastMessageBrokerSequenceId in interface PersistenceAdapter
Returns:
lastMessageBrokerSequenceId
Throws:
IOException
See Also:
PersistenceAdapter.getLastMessageBrokerSequenceId()

getLastProducerSequenceId

public long getLastProducerSequenceId(ProducerId id)
                               throws IOException
Description copied from interface: PersistenceAdapter
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs

Specified by:
getLastProducerSequenceId in interface PersistenceAdapter
Parameters:
id - the producerId to find a sequenceId for
Returns:
the last stored sequence id or -1 if no suppression needed
Throws:
IOException

removeQueueMessageStore

public void removeQueueMessageStore(ActiveMQQueue destination)
Description copied from interface: PersistenceAdapter
Cleanup method to remove any state associated with the given destination. This method does not stop the message store (it might not be cached).

Specified by:
removeQueueMessageStore in interface PersistenceAdapter
Parameters:
destination -
See Also:
PersistenceAdapter.removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)

removeTopicMessageStore

public void removeTopicMessageStore(ActiveMQTopic destination)
Description copied from interface: PersistenceAdapter
Cleanup method to remove any state associated with the given destination This method does not stop the message store (it might not be cached).

Specified by:
removeTopicMessageStore in interface PersistenceAdapter
Parameters:
destination -
See Also:
PersistenceAdapter.removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)

rollbackTransaction

public void rollbackTransaction(ConnectionContext context)
                         throws IOException
Description copied from interface: PersistenceAdapter
Rollback a persistence transaction

Specified by:
rollbackTransaction in interface PersistenceAdapter
Parameters:
context -
Throws:
IOException
See Also:
PersistenceAdapter.rollbackTransaction(org.apache.activemq.broker.ConnectionContext)

setBrokerName

public void setBrokerName(String brokerName)
Description copied from interface: PersistenceAdapter
Set the name of the broker using the adapter

Specified by:
setBrokerName in interface PersistenceAdapter
Parameters:
brokerName -
See Also:
PersistenceAdapter.setBrokerName(java.lang.String)

setUsageManager

public void setUsageManager(SystemUsage usageManager)
Specified by:
setUsageManager in interface PersistenceAdapter
Parameters:
usageManager -
See Also:
PersistenceAdapter.setUsageManager(org.apache.activemq.usage.SystemUsage)

size

public long size()
Description copied from interface: PersistenceAdapter
A hint to return the size of the store on disk

Specified by:
size in interface PersistenceAdapter
Returns:
the size of the store
See Also:
PersistenceAdapter.size()

doStart

public void doStart()
             throws Exception
Specified by:
doStart in class ServiceSupport
Throws:
Exception
See Also:
Service.start()

doStop

public void doStop(ServiceStopper stopper)
            throws Exception
Specified by:
doStop in class ServiceSupport
Throws:
Exception
See Also:
Service.stop()

getJournalMaxFileLength

public int getJournalMaxFileLength()
Get the journalMaxFileLength

Specified by:
getJournalMaxFileLength in interface JournaledStore
Returns:
the journalMaxFileLength

setJournalMaxFileLength

public void setJournalMaxFileLength(int journalMaxFileLength)
When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used


setMaxFailoverProducersToTrack

public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack)
Set the max number of producers (LRU cache) to track for duplicate sends


getMaxFailoverProducersToTrack

public int getMaxFailoverProducersToTrack()

setFailoverProducersAuditDepth

public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth)
set the audit window depth for duplicate suppression (should exceed the max transaction batch)


getFailoverProducersAuditDepth

public int getFailoverProducersAuditDepth()

getCheckpointInterval

public long getCheckpointInterval()
Get the checkpointInterval

Returns:
the checkpointInterval

setCheckpointInterval

public void setCheckpointInterval(long checkpointInterval)
Set the checkpointInterval

Parameters:
checkpointInterval - the checkpointInterval to set

getCleanupInterval

public long getCleanupInterval()
Get the cleanupInterval

Returns:
the cleanupInterval

setCleanupInterval

public void setCleanupInterval(long cleanupInterval)
Set the cleanupInterval

Parameters:
cleanupInterval - the cleanupInterval to set

getIndexWriteBatchSize

public int getIndexWriteBatchSize()
Get the indexWriteBatchSize

Returns:
the indexWriteBatchSize

setIndexWriteBatchSize

public void setIndexWriteBatchSize(int indexWriteBatchSize)
Set the indexWriteBatchSize When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used

Parameters:
indexWriteBatchSize - the indexWriteBatchSize to set

getJournalMaxWriteBatchSize

public int getJournalMaxWriteBatchSize()
Get the journalMaxWriteBatchSize

Returns:
the journalMaxWriteBatchSize

setJournalMaxWriteBatchSize

public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
Set the journalMaxWriteBatchSize * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used

Parameters:
journalMaxWriteBatchSize - the journalMaxWriteBatchSize to set

isEnableIndexWriteAsync

public boolean isEnableIndexWriteAsync()
Get the enableIndexWriteAsync

Returns:
the enableIndexWriteAsync

setEnableIndexWriteAsync

public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
Set the enableIndexWriteAsync

Parameters:
enableIndexWriteAsync - the enableIndexWriteAsync to set

getDirectory

public File getDirectory()
Get the directory

Specified by:
getDirectory in interface PersistenceAdapter
Returns:
the directory

setDirectory

public void setDirectory(File dir)
Description copied from interface: PersistenceAdapter
Set the directory where any data files should be created

Specified by:
setDirectory in interface PersistenceAdapter
Parameters:
dir -
See Also:
PersistenceAdapter.setDirectory(java.io.File)

isEnableJournalDiskSyncs

public boolean isEnableJournalDiskSyncs()
Get the enableJournalDiskSyncs

Returns:
the enableJournalDiskSyncs

setEnableJournalDiskSyncs

public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs)
Set the enableJournalDiskSyncs

Parameters:
enableJournalDiskSyncs - the enableJournalDiskSyncs to set

getIndexCacheSize

public int getIndexCacheSize()
Get the indexCacheSize

Returns:
the indexCacheSize

setIndexCacheSize

public void setIndexCacheSize(int indexCacheSize)
Set the indexCacheSize When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used

Parameters:
indexCacheSize - the indexCacheSize to set

isIgnoreMissingJournalfiles

public boolean isIgnoreMissingJournalfiles()
Get the ignoreMissingJournalfiles

Returns:
the ignoreMissingJournalfiles

setIgnoreMissingJournalfiles

public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
Set the ignoreMissingJournalfiles

Parameters:
ignoreMissingJournalfiles - the ignoreMissingJournalfiles to set

isChecksumJournalFiles

public boolean isChecksumJournalFiles()

isCheckForCorruptJournalFiles

public boolean isCheckForCorruptJournalFiles()

setChecksumJournalFiles

public void setChecksumJournalFiles(boolean checksumJournalFiles)

setCheckForCorruptJournalFiles

public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)

setBrokerService

public void setBrokerService(BrokerService brokerService)
Specified by:
setBrokerService in interface BrokerServiceAware
Overrides:
setBrokerService in class LockableServiceSupport

isArchiveDataLogs

public boolean isArchiveDataLogs()

setArchiveDataLogs

public void setArchiveDataLogs(boolean archiveDataLogs)

getDirectoryArchive

public File getDirectoryArchive()

setDirectoryArchive

public void setDirectoryArchive(File directoryArchive)

isConcurrentStoreAndDispatchQueues

public boolean isConcurrentStoreAndDispatchQueues()

setConcurrentStoreAndDispatchQueues

public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch)

isConcurrentStoreAndDispatchTopics

public boolean isConcurrentStoreAndDispatchTopics()

setConcurrentStoreAndDispatchTopics

public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch)

getMaxAsyncJobs

public int getMaxAsyncJobs()

setMaxAsyncJobs

public void setMaxAsyncJobs(int maxAsyncJobs)
Parameters:
maxAsyncJobs - the maxAsyncJobs to set

setDatabaseLockedWaitDelay

@Deprecated
public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay)
                                throws IOException
Deprecated. use Locker.setLockAcquireSleepInterval(long) instead

Parameters:
databaseLockedWaitDelay - the databaseLockedWaitDelay to set
Throws:
IOException

getForceRecoverIndex

public boolean getForceRecoverIndex()

setForceRecoverIndex

public void setForceRecoverIndex(boolean forceRecoverIndex)

isArchiveCorruptedIndex

public boolean isArchiveCorruptedIndex()

setArchiveCorruptedIndex

public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex)

setRewriteOnRedelivery

public void setRewriteOnRedelivery(boolean rewriteOnRedelivery)
When true, persist the redelivery status such that the message redelivery flag can survive a broker failure used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true


isRewriteOnRedelivery

public boolean isRewriteOnRedelivery()

getIndexLFUEvictionFactor

public float getIndexLFUEvictionFactor()

setIndexLFUEvictionFactor

public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor)

isUseIndexLFRUEviction

public boolean isUseIndexLFRUEviction()

setUseIndexLFRUEviction

public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction)

setEnableIndexDiskSyncs

public void setEnableIndexDiskSyncs(boolean diskSyncs)

isEnableIndexDiskSyncs

public boolean isEnableIndexDiskSyncs()

setEnableIndexRecoveryFile

public void setEnableIndexRecoveryFile(boolean enable)

isEnableIndexRecoveryFile

public boolean isEnableIndexRecoveryFile()

setEnableIndexPageCaching

public void setEnableIndexPageCaching(boolean enable)

isEnableIndexPageCaching

public boolean isEnableIndexPageCaching()

getStore

public KahaDBStore getStore()

createTransactionInfo

public KahaTransactionInfo createTransactionInfo(TransactionId txid)

createDefaultLocker

public Locker createDefaultLocker()
                           throws IOException
Description copied from interface: Lockable
Create a default locker

Specified by:
createDefaultLocker in interface Lockable
Returns:
default locker
Throws:
IOException

init

public void init()
          throws Exception
Description copied from class: LockableServiceSupport
Initialize resources before locking

Specified by:
init in class LockableServiceSupport
Throws:
Exception

toString

public String toString()
Overrides:
toString in class Object

setTransactionIdTransformer

public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer)
Specified by:
setTransactionIdTransformer in interface TransactionIdTransformerAware


Copyright © 2005–2013 The Apache Software Foundation. All rights reserved.