org.apache.activemq.store.amq
Class AMQPersistenceAdapter

java.lang.Object
  extended by org.apache.activemq.store.amq.AMQPersistenceAdapter
All Implemented Interfaces:
BrokerServiceAware, Service, PersistenceAdapter, UsageListener

public class AMQPersistenceAdapter
extends Object
implements PersistenceAdapter, UsageListener, BrokerServiceAware

An implementation of PersistenceAdapter designed for use with a Journal and then check pointing asynchronously on a timeout with some other long term persistent storage.


Constructor Summary
AMQPersistenceAdapter()
           
 
Method Summary
protected  void addInProgressDataFile(AMQMessageStore store, int dataFileId)
           
 void beginTransaction(ConnectionContext context)
          This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.
 void checkpoint(boolean sync)
          When we checkpoint we move all the journalled data to long term storage.
 void cleanup()
          Cleans up the data files
 void commitTransaction(ConnectionContext context)
          Commit a persistence transaction
protected  AsyncDataManager createAsyncDataManager()
           
 MessageStore createQueueMessageStore(ActiveMQQueue destination)
          Factory method to create a new queue message store with the given destination name
protected  IOException createRecoveryFailedException(Exception e)
           
protected  KahaReferenceStoreAdapter createReferenceStoreAdapter()
           
 TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName)
          Factory method to create a new topic message store with the given destination name
 TransactionStore createTransactionStore()
          Factory method to create a new persistent prepared transaction store for XA recovery
protected  IOException createWriteException(DataStructure packet, Exception e)
           
protected  IOException createWriteException(String command, Exception e)
           
 void deleteAllMessages()
          Delete's all the messages in the persistent store.
 boolean doCheckpoint()
          This does the actual checkpoint.
protected  boolean doLock()
           
 AsyncDataManager getAsyncDataManager()
           
 String getBrokerName()
           
 BrokerService getBrokerService()
           
 long getCheckpointInterval()
           
 long getCleanupInterval()
           
 Set<ActiveMQDestination> getDestinations()
          Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.
 File getDirectory()
           
 File getDirectoryArchive()
           
 int getIndexBinSize()
           
 int getIndexKeySize()
           
 int getIndexLoadFactor()
           
 int getIndexMaxBinSize()
           
 int getIndexPageSize()
           
 int getJournalThreadPriority()
           
 long getLastMessageBrokerSequenceId()
           
 long getLastProducerSequenceId(ProducerId id)
          return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs
 int getMaxCheckpointMessageAddSize()
           
 int getMaxFileLength()
           
 int getMaxReferenceFileLength()
           
 ReferenceStoreAdapter getReferenceStoreAdapter()
           
 TaskRunnerFactory getTaskRunnerFactory()
           
 AMQTransactionStore getTransactionStore()
           
 SystemUsage getUsageManager()
           
 WireFormat getWireFormat()
           
 boolean isArchiveDataLogs()
           
 boolean isDisableLocking()
           
 boolean isForceRecoverReferenceStore()
           
 boolean isPersistentIndex()
           
 boolean isRecoverReferenceStore()
           
 boolean isSyncOnWrite()
           
 boolean isUseDedicatedTaskRunner()
           
 boolean isUseNio()
           
protected  void lock()
           
 void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)
           
 DataStructure readCommand(Location location)
           
protected  void removeInProgressDataFile(AMQMessageStore store, int dataFileId)
           
 void removeQueueMessageStore(ActiveMQQueue destination)
          Cleanup method to remove any state associated with the given destination
 void removeTopicMessageStore(ActiveMQTopic destination)
          Cleanup method to remove any state associated with the given destination
 void rollbackTransaction(ConnectionContext context)
          Rollback a persistence transaction
 void setArchiveDataLogs(boolean archiveDataLogs)
           
 void setAsyncDataManager(AsyncDataManager asyncDataManager)
           
 void setBrokerName(String brokerName)
          Set the name of the broker using the adapter
 void setBrokerService(BrokerService brokerService)
           
 void setCheckpointInterval(long checkpointInterval)
           
 void setCleanupInterval(long cleanupInterval)
           
 void setDirectory(File directory)
          Set the directory where any data files should be created
 void setDirectoryArchive(File directoryArchive)
           
 void setDisableLocking(boolean disableLocking)
           
 void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore)
           
 void setIndexBinSize(int indexBinSize)
           
 void setIndexKeySize(int indexKeySize)
           
 void setIndexLoadFactor(int factor)
           
 void setIndexMaxBinSize(int maxBinSize)
           
 void setIndexPageSize(int indexPageSize)
          When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setJournalThreadPriority(int journalThreadPriority)
           
 void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize)
          When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setMaxFileLength(int maxFileLength)
          When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setMaxReferenceFileLength(int maxReferenceFileLength)
          When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setPersistentIndex(boolean persistentIndex)
           
 void setRecoverReferenceStore(boolean recoverReferenceStore)
           
 void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter)
           
 void setSyncOnWrite(boolean syncOnWrite)
           
 void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
           
 void setUsageManager(SystemUsage usageManager)
           
 void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner)
           
 void setUseNio(boolean useNio)
           
 void setWireFormat(WireFormat wireFormat)
           
 long size()
          A hint to return the size of the store on disk
 void start()
           
 void stop()
           
 String toString()
           
 Location writeCommand(DataStructure command, boolean syncHint)
           
 Location writeCommand(DataStructure command, boolean syncHint, boolean forceSync)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Constructor Detail

AMQPersistenceAdapter

public AMQPersistenceAdapter()
Method Detail

getBrokerName

public String getBrokerName()

setBrokerName

public void setBrokerName(String brokerName)
Description copied from interface: PersistenceAdapter
Set the name of the broker using the adapter

Specified by:
setBrokerName in interface PersistenceAdapter

getBrokerService

public BrokerService getBrokerService()

setBrokerService

public void setBrokerService(BrokerService brokerService)
Specified by:
setBrokerService in interface BrokerServiceAware

start

public void start()
           throws Exception
Specified by:
start in interface Service
Throws:
Exception

stop

public void stop()
          throws Exception
Specified by:
stop in interface Service
Throws:
Exception

checkpoint

public void checkpoint(boolean sync)
When we checkpoint we move all the journalled data to long term storage.

Specified by:
checkpoint in interface PersistenceAdapter
Parameters:
sync -

doCheckpoint

public boolean doCheckpoint()
This does the actual checkpoint.

Returns:
true if successful

cleanup

public void cleanup()
Cleans up the data files

Throws:
IOException

getDestinations

public Set<ActiveMQDestination> getDestinations()
Description copied from interface: PersistenceAdapter
Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.

Specified by:
getDestinations in interface PersistenceAdapter
Returns:
active destinations

createQueueMessageStore

public MessageStore createQueueMessageStore(ActiveMQQueue destination)
                                     throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new queue message store with the given destination name

Specified by:
createQueueMessageStore in interface PersistenceAdapter
Returns:
the message store
Throws:
IOException

createTopicMessageStore

public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName)
                                          throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new topic message store with the given destination name

Specified by:
createTopicMessageStore in interface PersistenceAdapter
Returns:
the topic message store
Throws:
IOException

removeQueueMessageStore

public void removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destination

Specified by:
removeQueueMessageStore in interface PersistenceAdapter
Parameters:
destination -

removeTopicMessageStore

public void removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination

Specified by:
removeTopicMessageStore in interface PersistenceAdapter
Parameters:
destination -

createTransactionStore

public TransactionStore createTransactionStore()
                                        throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new persistent prepared transaction store for XA recovery

Specified by:
createTransactionStore in interface PersistenceAdapter
Returns:
transaction store
Throws:
IOException

getLastMessageBrokerSequenceId

public long getLastMessageBrokerSequenceId()
                                    throws IOException
Specified by:
getLastMessageBrokerSequenceId in interface PersistenceAdapter
Returns:
last broker sequence
Throws:
IOException

beginTransaction

public void beginTransaction(ConnectionContext context)
                      throws IOException
Description copied from interface: PersistenceAdapter
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.

Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.

Specified by:
beginTransaction in interface PersistenceAdapter
Throws:
IOException

commitTransaction

public void commitTransaction(ConnectionContext context)
                       throws IOException
Description copied from interface: PersistenceAdapter
Commit a persistence transaction

Specified by:
commitTransaction in interface PersistenceAdapter
Throws:
IOException
See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)

rollbackTransaction

public void rollbackTransaction(ConnectionContext context)
                         throws IOException
Description copied from interface: PersistenceAdapter
Rollback a persistence transaction

Specified by:
rollbackTransaction in interface PersistenceAdapter
Throws:
IOException
See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)

isPersistentIndex

public boolean isPersistentIndex()

setPersistentIndex

public void setPersistentIndex(boolean persistentIndex)

readCommand

public DataStructure readCommand(Location location)
                          throws IOException
Parameters:
location -
Returns:
Throws:
IOException

createWriteException

protected IOException createWriteException(DataStructure packet,
                                           Exception e)

createWriteException

protected IOException createWriteException(String command,
                                           Exception e)

createRecoveryFailedException

protected IOException createRecoveryFailedException(Exception e)

writeCommand

public Location writeCommand(DataStructure command,
                             boolean syncHint)
                      throws IOException
Parameters:
command -
syncHint -
Returns:
Throws:
IOException

writeCommand

public Location writeCommand(DataStructure command,
                             boolean syncHint,
                             boolean forceSync)
                      throws IOException
Throws:
IOException

onUsageChanged

public void onUsageChanged(Usage usage,
                           int oldPercentUsage,
                           int newPercentUsage)
Specified by:
onUsageChanged in interface UsageListener

getTransactionStore

public AMQTransactionStore getTransactionStore()

deleteAllMessages

public void deleteAllMessages()
                       throws IOException
Description copied from interface: PersistenceAdapter
Delete's all the messages in the persistent store.

Specified by:
deleteAllMessages in interface PersistenceAdapter
Throws:
IOException

toString

public String toString()
Overrides:
toString in class Object

createAsyncDataManager

protected AsyncDataManager createAsyncDataManager()

createReferenceStoreAdapter

protected KahaReferenceStoreAdapter createReferenceStoreAdapter()
                                                         throws IOException
Throws:
IOException

getAsyncDataManager

public AsyncDataManager getAsyncDataManager()

setAsyncDataManager

public void setAsyncDataManager(AsyncDataManager asyncDataManager)

getReferenceStoreAdapter

public ReferenceStoreAdapter getReferenceStoreAdapter()

getTaskRunnerFactory

public TaskRunnerFactory getTaskRunnerFactory()

setTaskRunnerFactory

public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)

getWireFormat

public WireFormat getWireFormat()
Returns:
Returns the wireFormat.

setWireFormat

public void setWireFormat(WireFormat wireFormat)

getUsageManager

public SystemUsage getUsageManager()

setUsageManager

public void setUsageManager(SystemUsage usageManager)
Specified by:
setUsageManager in interface PersistenceAdapter
Parameters:
usageManager - The UsageManager that is controlling the broker's memory usage.

getMaxCheckpointMessageAddSize

public int getMaxCheckpointMessageAddSize()

setMaxCheckpointMessageAddSize

public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize)
When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used


getDirectory

public File getDirectory()
Specified by:
getDirectory in interface PersistenceAdapter
Returns:
the directory used by the persistence adaptor

setDirectory

public void setDirectory(File directory)
Description copied from interface: PersistenceAdapter
Set the directory where any data files should be created

Specified by:
setDirectory in interface PersistenceAdapter

isSyncOnWrite

public boolean isSyncOnWrite()

setSyncOnWrite

public void setSyncOnWrite(boolean syncOnWrite)

setReferenceStoreAdapter

public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter)
Parameters:
referenceStoreAdapter - the referenceStoreAdapter to set

size

public long size()
Description copied from interface: PersistenceAdapter
A hint to return the size of the store on disk

Specified by:
size in interface PersistenceAdapter
Returns:
disk space used in bytes of 0 if not implemented

isUseNio

public boolean isUseNio()

setUseNio

public void setUseNio(boolean useNio)

getMaxFileLength

public int getMaxFileLength()

setMaxFileLength

public void setMaxFileLength(int maxFileLength)
When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used


getCleanupInterval

public long getCleanupInterval()

setCleanupInterval

public void setCleanupInterval(long cleanupInterval)

getCheckpointInterval

public long getCheckpointInterval()

setCheckpointInterval

public void setCheckpointInterval(long checkpointInterval)

getIndexBinSize

public int getIndexBinSize()

setIndexBinSize

public void setIndexBinSize(int indexBinSize)

getIndexKeySize

public int getIndexKeySize()

setIndexKeySize

public void setIndexKeySize(int indexKeySize)

getIndexPageSize

public int getIndexPageSize()

getIndexMaxBinSize

public int getIndexMaxBinSize()

setIndexMaxBinSize

public void setIndexMaxBinSize(int maxBinSize)

setIndexPageSize

public void setIndexPageSize(int indexPageSize)
When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used


setIndexLoadFactor

public void setIndexLoadFactor(int factor)

getIndexLoadFactor

public int getIndexLoadFactor()

getMaxReferenceFileLength

public int getMaxReferenceFileLength()

setMaxReferenceFileLength

public void setMaxReferenceFileLength(int maxReferenceFileLength)
When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used


getDirectoryArchive

public File getDirectoryArchive()

setDirectoryArchive

public void setDirectoryArchive(File directoryArchive)

isArchiveDataLogs

public boolean isArchiveDataLogs()

setArchiveDataLogs

public void setArchiveDataLogs(boolean archiveDataLogs)

isDisableLocking

public boolean isDisableLocking()

setDisableLocking

public void setDisableLocking(boolean disableLocking)

isRecoverReferenceStore

public boolean isRecoverReferenceStore()
Returns:
the recoverReferenceStore

setRecoverReferenceStore

public void setRecoverReferenceStore(boolean recoverReferenceStore)
Parameters:
recoverReferenceStore - the recoverReferenceStore to set

isForceRecoverReferenceStore

public boolean isForceRecoverReferenceStore()
Returns:
the forceRecoverReferenceStore

setForceRecoverReferenceStore

public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore)
Parameters:
forceRecoverReferenceStore - the forceRecoverReferenceStore to set

isUseDedicatedTaskRunner

public boolean isUseDedicatedTaskRunner()

setUseDedicatedTaskRunner

public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner)

getJournalThreadPriority

public int getJournalThreadPriority()
Returns:
the journalThreadPriority

setJournalThreadPriority

public void setJournalThreadPriority(int journalThreadPriority)
Parameters:
journalThreadPriority - the journalThreadPriority to set

addInProgressDataFile

protected void addInProgressDataFile(AMQMessageStore store,
                                     int dataFileId)

removeInProgressDataFile

protected void removeInProgressDataFile(AMQMessageStore store,
                                        int dataFileId)

lock

protected void lock()
             throws Exception
Throws:
Exception

doLock

protected boolean doLock()
                  throws IOException
Throws:
IOException

getLastProducerSequenceId

public long getLastProducerSequenceId(ProducerId id)
Description copied from interface: PersistenceAdapter
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs

Specified by:
getLastProducerSequenceId in interface PersistenceAdapter
Parameters:
id - the producerId to find a sequenceId for
Returns:
the last stored sequence id or -1 if no suppression needed


Copyright © 2005-2012 The Apache Software Foundation. All Rights Reserved.