Package org.apache.activemq.store.kahadb
Class MultiKahaDBPersistenceAdapter
- java.lang.Object
-
- org.apache.activemq.util.ServiceSupport
-
- org.apache.activemq.broker.LockableServiceSupport
-
- org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter
-
- All Implemented Interfaces:
BrokerServiceAware
,Lockable
,Service
,NoLocalSubscriptionAware
,PersistenceAdapter
public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, BrokerServiceAware, NoLocalSubscriptionAware
An implementation ofPersistenceAdapter
that supports distribution of destinations across multiple kahaDB persistence adapters
-
-
Field Summary
-
Fields inherited from class org.apache.activemq.broker.LockableServiceSupport
brokerService, clockDaemon
-
-
Constructor Summary
Constructors Constructor Description MultiKahaDBPersistenceAdapter()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
allowIOResumption()
void
beginTransaction(ConnectionContext context)
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.void
checkpoint(boolean cleanup)
checkpoint anyvoid
commitTransaction(ConnectionContext context)
Commit a persistence transactionLocker
createDefaultLocker()
Create a default lockerJobSchedulerStore
createJobSchedulerStore()
Creates and returns a new Job Scheduler store instance.MessageStore
createQueueMessageStore(ActiveMQQueue destination)
Factory method to create a new queue message store with the given destination nameTopicMessageStore
createTopicMessageStore(ActiveMQTopic destination)
Factory method to create a new topic message store with the given destination nameTransactionStore
createTransactionStore()
Factory method to create a new persistent prepared transaction store for XA recoveryvoid
deleteAllMessages()
Delete's all the messages in the persistent store.void
doStart()
protected void
doStop(ServiceStopper stopper)
List<PersistenceAdapter>
getAdapters()
Set<ActiveMQDestination>
getDestinations()
Returns a set of all theActiveMQDestination
objects that the persistence store is aware exist.File
getDirectory()
long
getJournalCleanupInterval()
int
getJournalMaxFileLength()
int
getJournalWriteBatchSize()
long
getLastMessageBrokerSequenceId()
long
getLastProducerSequenceId(ProducerId id)
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occursvoid
init()
Initialize resources before lockingboolean
isCheckForCorruption()
boolean
isLocalXid(TransactionId xid)
boolean
isPersistNoLocal()
static String
nameFromDestinationFilter(ActiveMQDestination destination)
void
removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destination.void
removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination This method does not stop the message store (it might not be cached).void
rollbackTransaction(ConnectionContext context)
Rollback a persistence transactionvoid
setBrokerName(String brokerName)
Set the name of the broker using the adaptervoid
setBrokerService(BrokerService brokerService)
void
setCheckForCorruption(boolean checkForCorruption)
void
setDirectory(File directory)
Set the directory where any data files should be createdvoid
setFilteredPersistenceAdapters(List entries)
Sets the FilteredKahaDBPersistenceAdapter entriesvoid
setJournalCleanupInterval(long journalCleanupInterval)
void
setJournalMaxFileLength(int maxFileLength)
Set the max file length of the transaction journal When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be usedvoid
setJournalWriteBatchSize(int journalWriteBatchSize)
Set the max write batch size of the transaction journal When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be usedvoid
setTransactionStore(MultiKahaDBTransactionStore transactionStore)
void
setUsageManager(SystemUsage usageManager)
long
size()
A hint to return the size of the store on diskString
toString()
-
Methods inherited from class org.apache.activemq.broker.LockableServiceSupport
getBrokerService, getLocker, getLockKeepAlivePeriod, getScheduledThreadPoolExecutor, isStopOnError, isUseLock, keepLockAlive, postStop, preStart, setLocker, setLockKeepAlivePeriod, setScheduledThreadPoolExecutor, setStopOnError, setUseLock, stopBroker
-
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, removeServiceListener, start, stop
-
-
-
-
Method Detail
-
setFilteredPersistenceAdapters
public void setFilteredPersistenceAdapters(List entries)
Sets the FilteredKahaDBPersistenceAdapter entries
-
nameFromDestinationFilter
public static String nameFromDestinationFilter(ActiveMQDestination destination)
-
isLocalXid
public boolean isLocalXid(TransactionId xid)
-
beginTransaction
public void beginTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization. Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.- Specified by:
beginTransaction
in interfacePersistenceAdapter
- Throws:
IOException
-
checkpoint
public void checkpoint(boolean cleanup) throws IOException
Description copied from interface:PersistenceAdapter
checkpoint any- Specified by:
checkpoint
in interfacePersistenceAdapter
- Throws:
IOException
-
commitTransaction
public void commitTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
Commit a persistence transaction- Specified by:
commitTransaction
in interfacePersistenceAdapter
- Throws:
IOException
- See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)
-
createQueueMessageStore
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new queue message store with the given destination name- Specified by:
createQueueMessageStore
in interfacePersistenceAdapter
- Returns:
- the message store
- Throws:
IOException
-
createTopicMessageStore
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new topic message store with the given destination name- Specified by:
createTopicMessageStore
in interfacePersistenceAdapter
- Returns:
- the topic message store
- Throws:
IOException
-
createTransactionStore
public TransactionStore createTransactionStore() throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new persistent prepared transaction store for XA recovery- Specified by:
createTransactionStore
in interfacePersistenceAdapter
- Returns:
- transaction store
- Throws:
IOException
-
deleteAllMessages
public void deleteAllMessages() throws IOException
Description copied from interface:PersistenceAdapter
Delete's all the messages in the persistent store.- Specified by:
deleteAllMessages
in interfacePersistenceAdapter
- Throws:
IOException
-
getDestinations
public Set<ActiveMQDestination> getDestinations()
Description copied from interface:PersistenceAdapter
Returns a set of all theActiveMQDestination
objects that the persistence store is aware exist.- Specified by:
getDestinations
in interfacePersistenceAdapter
- Returns:
- active destinations
-
getLastMessageBrokerSequenceId
public long getLastMessageBrokerSequenceId() throws IOException
- Specified by:
getLastMessageBrokerSequenceId
in interfacePersistenceAdapter
- Returns:
- last broker sequence
- Throws:
IOException
-
getLastProducerSequenceId
public long getLastProducerSequenceId(ProducerId id) throws IOException
Description copied from interface:PersistenceAdapter
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs- Specified by:
getLastProducerSequenceId
in interfacePersistenceAdapter
- Parameters:
id
- the producerId to find a sequenceId for- Returns:
- the last stored sequence id or -1 if no suppression needed
- Throws:
IOException
-
allowIOResumption
public void allowIOResumption()
- Specified by:
allowIOResumption
in interfacePersistenceAdapter
-
removeQueueMessageStore
public void removeQueueMessageStore(ActiveMQQueue destination)
Description copied from interface:PersistenceAdapter
Cleanup method to remove any state associated with the given destination. This method does not stop the message store (it might not be cached).- Specified by:
removeQueueMessageStore
in interfacePersistenceAdapter
- Parameters:
destination
- Destination to forget
-
removeTopicMessageStore
public void removeTopicMessageStore(ActiveMQTopic destination)
Description copied from interface:PersistenceAdapter
Cleanup method to remove any state associated with the given destination This method does not stop the message store (it might not be cached).- Specified by:
removeTopicMessageStore
in interfacePersistenceAdapter
- Parameters:
destination
- Destination to forget
-
rollbackTransaction
public void rollbackTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
Rollback a persistence transaction- Specified by:
rollbackTransaction
in interfacePersistenceAdapter
- Throws:
IOException
- See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)
-
setBrokerName
public void setBrokerName(String brokerName)
Description copied from interface:PersistenceAdapter
Set the name of the broker using the adapter- Specified by:
setBrokerName
in interfacePersistenceAdapter
-
setUsageManager
public void setUsageManager(SystemUsage usageManager)
- Specified by:
setUsageManager
in interfacePersistenceAdapter
- Parameters:
usageManager
- The UsageManager that is controlling the broker's memory usage.
-
size
public long size()
Description copied from interface:PersistenceAdapter
A hint to return the size of the store on disk- Specified by:
size
in interfacePersistenceAdapter
- Returns:
- disk space used in bytes of 0 if not implemented
-
doStart
public void doStart() throws Exception
- Specified by:
doStart
in classServiceSupport
- Throws:
Exception
-
doStop
protected void doStop(ServiceStopper stopper) throws Exception
- Specified by:
doStop
in classServiceSupport
- Throws:
Exception
-
getDirectory
public File getDirectory()
- Specified by:
getDirectory
in interfacePersistenceAdapter
- Returns:
- the directory used by the persistence adaptor
-
setDirectory
public void setDirectory(File directory)
Description copied from interface:PersistenceAdapter
Set the directory where any data files should be created- Specified by:
setDirectory
in interfacePersistenceAdapter
-
init
public void init() throws Exception
Description copied from class:LockableServiceSupport
Initialize resources before locking- Specified by:
init
in classLockableServiceSupport
- Throws:
Exception
-
setBrokerService
public void setBrokerService(BrokerService brokerService)
- Specified by:
setBrokerService
in interfaceBrokerServiceAware
- Overrides:
setBrokerService
in classLockableServiceSupport
-
setTransactionStore
public void setTransactionStore(MultiKahaDBTransactionStore transactionStore)
-
setJournalMaxFileLength
public void setJournalMaxFileLength(int maxFileLength)
Set the max file length of the transaction journal When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
-
getJournalMaxFileLength
public int getJournalMaxFileLength()
-
setJournalWriteBatchSize
public void setJournalWriteBatchSize(int journalWriteBatchSize)
Set the max write batch size of the transaction journal When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
-
getJournalWriteBatchSize
public int getJournalWriteBatchSize()
-
setJournalCleanupInterval
public void setJournalCleanupInterval(long journalCleanupInterval)
-
getJournalCleanupInterval
public long getJournalCleanupInterval()
-
setCheckForCorruption
public void setCheckForCorruption(boolean checkForCorruption)
-
isCheckForCorruption
public boolean isCheckForCorruption()
-
getAdapters
public List<PersistenceAdapter> getAdapters()
-
createDefaultLocker
public Locker createDefaultLocker() throws IOException
Description copied from interface:Lockable
Create a default locker- Specified by:
createDefaultLocker
in interfaceLockable
- Returns:
- default locker
- Throws:
IOException
-
createJobSchedulerStore
public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException
Description copied from interface:PersistenceAdapter
Creates and returns a new Job Scheduler store instance.- Specified by:
createJobSchedulerStore
in interfacePersistenceAdapter
- Returns:
- a new JobSchedulerStore instance if this Persistence adapter provides its own.
- Throws:
IOException
- If an error occurs while creating the new JobSchedulerStore.UnsupportedOperationException
- If this adapter does not provide its own scheduler store implementation.
-
isPersistNoLocal
public boolean isPersistNoLocal()
- Specified by:
isPersistNoLocal
in interfaceNoLocalSubscriptionAware
-
-