Class JournalPersistenceAdapter
- java.lang.Object
-
- org.apache.activemq.store.journal.JournalPersistenceAdapter
-
- All Implemented Interfaces:
org.apache.activeio.journal.JournalEventListener
,BrokerServiceAware
,Service
,PersistenceAdapter
,UsageListener
public class JournalPersistenceAdapter extends Object implements PersistenceAdapter, org.apache.activeio.journal.JournalEventListener, UsageListener, BrokerServiceAware
An implementation ofPersistenceAdapter
designed for use with aJournal
and then check pointing asynchronously on a timeout with some other long term persistent storage.
-
-
Constructor Summary
Constructors Constructor Description JournalPersistenceAdapter()
JournalPersistenceAdapter(org.apache.activeio.journal.Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
allowIOResumption()
void
beginTransaction(ConnectionContext context)
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.void
checkpoint(boolean sync)
checkpoint anyvoid
checkpoint(boolean sync, boolean fullCheckpoint)
When we checkpoint we move all the journalled data to long term storage.void
commitTransaction(ConnectionContext context)
Commit a persistence transactionJobSchedulerStore
createJobSchedulerStore()
Creates and returns a new Job Scheduler store instance.MessageStore
createQueueMessageStore(ActiveMQQueue destination)
Factory method to create a new queue message store with the given destination nameprotected IOException
createRecoveryFailedException(Exception e)
TopicMessageStore
createTopicMessageStore(ActiveMQTopic destinationName)
Factory method to create a new topic message store with the given destination nameTransactionStore
createTransactionStore()
Factory method to create a new persistent prepared transaction store for XA recoveryprotected IOException
createWriteException(String command, Exception e)
protected IOException
createWriteException(DataStructure packet, Exception e)
void
deleteAllMessages()
Delete's all the messages in the persistent store.boolean
doCheckpoint()
This does the actual checkpoint.long
getCheckpointInterval()
Set<ActiveMQDestination>
getDestinations()
Returns a set of all theActiveMQDestination
objects that the persistence store is aware exist.File
getDirectory()
long
getLastMessageBrokerSequenceId()
long
getLastProducerSequenceId(ProducerId id)
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occursPersistenceAdapter
getLongTermPersistence()
int
getMaxCheckpointMessageAddSize()
int
getMaxCheckpointWorkers()
JournalTransactionStore
getTransactionStore()
SystemUsage
getUsageManager()
WireFormat
getWireFormat()
boolean
isUseExternalMessageReferences()
void
onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)
void
overflowNotification(org.apache.activeio.journal.RecordLocation safeLocation)
The Journal give us a call back so that we can move old data out of the journal.DataStructure
readCommand(org.apache.activeio.journal.RecordLocation location)
void
removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destinationvoid
removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destinationvoid
rollbackTransaction(ConnectionContext context)
Rollback a persistence transactionvoid
setBrokerName(String brokerName)
Set the name of the broker using the adaptervoid
setBrokerService(BrokerService brokerService)
void
setCheckpointInterval(long checkpointInterval)
void
setDirectory(File dir)
Set the directory where any data files should be createdvoid
setJournal(org.apache.activeio.journal.Journal journal)
void
setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize)
void
setMaxCheckpointWorkers(int maxCheckpointWorkers)
void
setPersistenceAdapter(PersistenceAdapter longTermPersistence)
void
setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
void
setUsageManager(SystemUsage usageManager)
void
setUseExternalMessageReferences(boolean enable)
long
size()
A hint to return the size of the store on diskvoid
start()
void
stop()
ByteSequence
toByteSequence(org.apache.activeio.packet.Packet packet)
org.apache.activeio.packet.Packet
toPacket(ByteSequence sequence)
String
toString()
org.apache.activeio.journal.RecordLocation
writeCommand(DataStructure command, boolean sync)
-
-
-
Field Detail
-
scheduler
protected Scheduler scheduler
-
-
Constructor Detail
-
JournalPersistenceAdapter
public JournalPersistenceAdapter()
-
JournalPersistenceAdapter
public JournalPersistenceAdapter(org.apache.activeio.journal.Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException
- Throws:
IOException
-
-
Method Detail
-
setTaskRunnerFactory
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
-
setJournal
public void setJournal(org.apache.activeio.journal.Journal journal)
-
setPersistenceAdapter
public void setPersistenceAdapter(PersistenceAdapter longTermPersistence)
-
setUsageManager
public void setUsageManager(SystemUsage usageManager)
- Specified by:
setUsageManager
in interfacePersistenceAdapter
- Parameters:
usageManager
- The UsageManager that is controlling the destination's memory usage.
-
getDestinations
public Set<ActiveMQDestination> getDestinations()
Description copied from interface:PersistenceAdapter
Returns a set of all theActiveMQDestination
objects that the persistence store is aware exist.- Specified by:
getDestinations
in interfacePersistenceAdapter
- Returns:
- active destinations
-
createQueueMessageStore
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new queue message store with the given destination name- Specified by:
createQueueMessageStore
in interfacePersistenceAdapter
- Returns:
- the message store
- Throws:
IOException
-
createTopicMessageStore
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new topic message store with the given destination name- Specified by:
createTopicMessageStore
in interfacePersistenceAdapter
- Returns:
- the topic message store
- Throws:
IOException
-
removeQueueMessageStore
public void removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destination- Specified by:
removeQueueMessageStore
in interfacePersistenceAdapter
- Parameters:
destination
- Destination to forget
-
removeTopicMessageStore
public void removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination- Specified by:
removeTopicMessageStore
in interfacePersistenceAdapter
- Parameters:
destination
- Destination to forget
-
createTransactionStore
public TransactionStore createTransactionStore() throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new persistent prepared transaction store for XA recovery- Specified by:
createTransactionStore
in interfacePersistenceAdapter
- Returns:
- transaction store
- Throws:
IOException
-
getLastMessageBrokerSequenceId
public long getLastMessageBrokerSequenceId() throws IOException
- Specified by:
getLastMessageBrokerSequenceId
in interfacePersistenceAdapter
- Returns:
- last broker sequence
- Throws:
IOException
-
beginTransaction
public void beginTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization. Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.- Specified by:
beginTransaction
in interfacePersistenceAdapter
- Throws:
IOException
-
commitTransaction
public void commitTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
Commit a persistence transaction- Specified by:
commitTransaction
in interfacePersistenceAdapter
- Throws:
IOException
- See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)
-
rollbackTransaction
public void rollbackTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
Rollback a persistence transaction- Specified by:
rollbackTransaction
in interfacePersistenceAdapter
- Throws:
IOException
- See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)
-
start
public void start() throws Exception
-
getLongTermPersistence
public PersistenceAdapter getLongTermPersistence()
-
getWireFormat
public WireFormat getWireFormat()
- Returns:
- Returns the wireFormat.
-
overflowNotification
public void overflowNotification(org.apache.activeio.journal.RecordLocation safeLocation)
The Journal give us a call back so that we can move old data out of the journal. Taking a checkpoint does this for us.- Specified by:
overflowNotification
in interfaceorg.apache.activeio.journal.JournalEventListener
- See Also:
org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
-
checkpoint
public void checkpoint(boolean sync, boolean fullCheckpoint)
When we checkpoint we move all the journalled data to long term storage.
-
checkpoint
public void checkpoint(boolean sync)
Description copied from interface:PersistenceAdapter
checkpoint any- Specified by:
checkpoint
in interfacePersistenceAdapter
-
doCheckpoint
public boolean doCheckpoint()
This does the actual checkpoint.- Returns:
-
readCommand
public DataStructure readCommand(org.apache.activeio.journal.RecordLocation location) throws IOException
- Parameters:
location
-- Returns:
- Throws:
IOException
-
createWriteException
protected IOException createWriteException(DataStructure packet, Exception e)
-
createWriteException
protected IOException createWriteException(String command, Exception e)
-
createRecoveryFailedException
protected IOException createRecoveryFailedException(Exception e)
-
writeCommand
public org.apache.activeio.journal.RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException
- Parameters:
command
-sync
-- Returns:
- Throws:
IOException
-
onUsageChanged
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)
- Specified by:
onUsageChanged
in interfaceUsageListener
-
getTransactionStore
public JournalTransactionStore getTransactionStore()
-
deleteAllMessages
public void deleteAllMessages() throws IOException
Description copied from interface:PersistenceAdapter
Delete's all the messages in the persistent store.- Specified by:
deleteAllMessages
in interfacePersistenceAdapter
- Throws:
IOException
-
getUsageManager
public SystemUsage getUsageManager()
-
getMaxCheckpointMessageAddSize
public int getMaxCheckpointMessageAddSize()
-
setMaxCheckpointMessageAddSize
public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize)
-
getMaxCheckpointWorkers
public int getMaxCheckpointWorkers()
-
setMaxCheckpointWorkers
public void setMaxCheckpointWorkers(int maxCheckpointWorkers)
-
getCheckpointInterval
public long getCheckpointInterval()
-
setCheckpointInterval
public void setCheckpointInterval(long checkpointInterval)
-
isUseExternalMessageReferences
public boolean isUseExternalMessageReferences()
-
setUseExternalMessageReferences
public void setUseExternalMessageReferences(boolean enable)
-
toPacket
public org.apache.activeio.packet.Packet toPacket(ByteSequence sequence)
-
toByteSequence
public ByteSequence toByteSequence(org.apache.activeio.packet.Packet packet)
-
setBrokerName
public void setBrokerName(String brokerName)
Description copied from interface:PersistenceAdapter
Set the name of the broker using the adapter- Specified by:
setBrokerName
in interfacePersistenceAdapter
-
setDirectory
public void setDirectory(File dir)
Description copied from interface:PersistenceAdapter
Set the directory where any data files should be created- Specified by:
setDirectory
in interfacePersistenceAdapter
-
getDirectory
public File getDirectory()
- Specified by:
getDirectory
in interfacePersistenceAdapter
- Returns:
- the directory used by the persistence adaptor
-
size
public long size()
Description copied from interface:PersistenceAdapter
A hint to return the size of the store on disk- Specified by:
size
in interfacePersistenceAdapter
- Returns:
- disk space used in bytes of 0 if not implemented
-
setBrokerService
public void setBrokerService(BrokerService brokerService)
- Specified by:
setBrokerService
in interfaceBrokerServiceAware
-
getLastProducerSequenceId
public long getLastProducerSequenceId(ProducerId id)
Description copied from interface:PersistenceAdapter
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs- Specified by:
getLastProducerSequenceId
in interfacePersistenceAdapter
- Parameters:
id
- the producerId to find a sequenceId for- Returns:
- the last stored sequence id or -1 if no suppression needed
-
allowIOResumption
public void allowIOResumption()
- Specified by:
allowIOResumption
in interfacePersistenceAdapter
-
createJobSchedulerStore
public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException
Description copied from interface:PersistenceAdapter
Creates and returns a new Job Scheduler store instance.- Specified by:
createJobSchedulerStore
in interfacePersistenceAdapter
- Returns:
- a new JobSchedulerStore instance if this Persistence adapter provides its own.
- Throws:
IOException
- If an error occurs while creating the new JobSchedulerStore.UnsupportedOperationException
- If this adapter does not provide its own scheduler store implementation.
-
-