org.apache.activemq.store.journal
Class JournalPersistenceAdapter

java.lang.Object
  extended by org.apache.activemq.store.journal.JournalPersistenceAdapter
All Implemented Interfaces:
org.apache.activeio.journal.JournalEventListener, BrokerServiceAware, Service, PersistenceAdapter, UsageListener

public class JournalPersistenceAdapter
extends Object
implements PersistenceAdapter, org.apache.activeio.journal.JournalEventListener, UsageListener, BrokerServiceAware

An implementation of PersistenceAdapter designed for use with a Journal and then check pointing asynchronously on a timeout with some other long term persistent storage.


Field Summary
protected  Scheduler scheduler
           
 
Constructor Summary
JournalPersistenceAdapter()
           
JournalPersistenceAdapter(org.apache.activeio.journal.Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory)
           
 
Method Summary
 void beginTransaction(ConnectionContext context)
          This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.
 void checkpoint(boolean sync)
          checkpoint any
 void checkpoint(boolean sync, boolean fullCheckpoint)
          When we checkpoint we move all the journalled data to long term storage.
 void commitTransaction(ConnectionContext context)
          Commit a persistence transaction
 MessageStore createQueueMessageStore(ActiveMQQueue destination)
          Factory method to create a new queue message store with the given destination name
protected  IOException createRecoveryFailedException(Exception e)
           
 TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName)
          Factory method to create a new topic message store with the given destination name
 TransactionStore createTransactionStore()
          Factory method to create a new persistent prepared transaction store for XA recovery
protected  IOException createWriteException(DataStructure packet, Exception e)
           
protected  IOException createWriteException(String command, Exception e)
           
 void deleteAllMessages()
          Delete's all the messages in the persistent store.
 boolean doCheckpoint()
          This does the actual checkpoint.
 Set<ActiveMQDestination> getDestinations()
          Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.
 File getDirectory()
           
 long getLastMessageBrokerSequenceId()
           
 long getLastProducerSequenceId(ProducerId id)
          return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs
 PersistenceAdapter getLongTermPersistence()
           
 int getMaxCheckpointMessageAddSize()
           
 int getMaxCheckpointWorkers()
           
 JournalTransactionStore getTransactionStore()
           
 SystemUsage getUsageManager()
           
 WireFormat getWireFormat()
           
 boolean isUseExternalMessageReferences()
           
 void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)
           
 void overflowNotification(org.apache.activeio.journal.RecordLocation safeLocation)
          The Journal give us a call back so that we can move old data out of the journal.
 DataStructure readCommand(org.apache.activeio.journal.RecordLocation location)
           
 void removeQueueMessageStore(ActiveMQQueue destination)
          Cleanup method to remove any state associated with the given destination
 void removeTopicMessageStore(ActiveMQTopic destination)
          Cleanup method to remove any state associated with the given destination
 void rollbackTransaction(ConnectionContext context)
          Rollback a persistence transaction
 void setBrokerName(String brokerName)
          Set the name of the broker using the adapter
 void setBrokerService(BrokerService brokerService)
           
 void setDirectory(File dir)
          Set the directory where any data files should be created
 void setJournal(org.apache.activeio.journal.Journal journal)
           
 void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize)
           
 void setMaxCheckpointWorkers(int maxCheckpointWorkers)
           
 void setPersistenceAdapter(PersistenceAdapter longTermPersistence)
           
 void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
           
 void setUsageManager(SystemUsage usageManager)
           
 void setUseExternalMessageReferences(boolean enable)
           
 long size()
          A hint to return the size of the store on disk
 void start()
           
 void stop()
           
 ByteSequence toByteSequence(org.apache.activeio.packet.Packet packet)
           
 org.apache.activeio.packet.Packet toPacket(ByteSequence sequence)
           
 String toString()
           
 org.apache.activeio.journal.RecordLocation writeCommand(DataStructure command, boolean sync)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

scheduler

protected Scheduler scheduler
Constructor Detail

JournalPersistenceAdapter

public JournalPersistenceAdapter()

JournalPersistenceAdapter

public JournalPersistenceAdapter(org.apache.activeio.journal.Journal journal,
                                 PersistenceAdapter longTermPersistence,
                                 TaskRunnerFactory taskRunnerFactory)
                          throws IOException
Throws:
IOException
Method Detail

setTaskRunnerFactory

public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)

setJournal

public void setJournal(org.apache.activeio.journal.Journal journal)

setPersistenceAdapter

public void setPersistenceAdapter(PersistenceAdapter longTermPersistence)

setUsageManager

public void setUsageManager(SystemUsage usageManager)
Specified by:
setUsageManager in interface PersistenceAdapter
Parameters:
usageManager - The UsageManager that is controlling the destination's memory usage.

getDestinations

public Set<ActiveMQDestination> getDestinations()
Description copied from interface: PersistenceAdapter
Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.

Specified by:
getDestinations in interface PersistenceAdapter
Returns:
active destinations

createQueueMessageStore

public MessageStore createQueueMessageStore(ActiveMQQueue destination)
                                     throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new queue message store with the given destination name

Specified by:
createQueueMessageStore in interface PersistenceAdapter
Returns:
the message store
Throws:
IOException

createTopicMessageStore

public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName)
                                          throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new topic message store with the given destination name

Specified by:
createTopicMessageStore in interface PersistenceAdapter
Returns:
the topic message store
Throws:
IOException

removeQueueMessageStore

public void removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destination

Specified by:
removeQueueMessageStore in interface PersistenceAdapter
Parameters:
destination - Destination to forget

removeTopicMessageStore

public void removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination

Specified by:
removeTopicMessageStore in interface PersistenceAdapter
Parameters:
destination - Destination to forget

createTransactionStore

public TransactionStore createTransactionStore()
                                        throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new persistent prepared transaction store for XA recovery

Specified by:
createTransactionStore in interface PersistenceAdapter
Returns:
transaction store
Throws:
IOException

getLastMessageBrokerSequenceId

public long getLastMessageBrokerSequenceId()
                                    throws IOException
Specified by:
getLastMessageBrokerSequenceId in interface PersistenceAdapter
Returns:
last broker sequence
Throws:
IOException

beginTransaction

public void beginTransaction(ConnectionContext context)
                      throws IOException
Description copied from interface: PersistenceAdapter
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.

Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.

Specified by:
beginTransaction in interface PersistenceAdapter
Throws:
IOException

commitTransaction

public void commitTransaction(ConnectionContext context)
                       throws IOException
Description copied from interface: PersistenceAdapter
Commit a persistence transaction

Specified by:
commitTransaction in interface PersistenceAdapter
Throws:
IOException
See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)

rollbackTransaction

public void rollbackTransaction(ConnectionContext context)
                         throws IOException
Description copied from interface: PersistenceAdapter
Rollback a persistence transaction

Specified by:
rollbackTransaction in interface PersistenceAdapter
Throws:
IOException
See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)

start

public void start()
           throws Exception
Specified by:
start in interface Service
Throws:
Exception

stop

public void stop()
          throws Exception
Specified by:
stop in interface Service
Throws:
Exception

getLongTermPersistence

public PersistenceAdapter getLongTermPersistence()

getWireFormat

public WireFormat getWireFormat()
Returns:
Returns the wireFormat.

overflowNotification

public void overflowNotification(org.apache.activeio.journal.RecordLocation safeLocation)
The Journal give us a call back so that we can move old data out of the journal. Taking a checkpoint does this for us.

Specified by:
overflowNotification in interface org.apache.activeio.journal.JournalEventListener
See Also:
org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)

checkpoint

public void checkpoint(boolean sync,
                       boolean fullCheckpoint)
When we checkpoint we move all the journalled data to long term storage.


checkpoint

public void checkpoint(boolean sync)
Description copied from interface: PersistenceAdapter
checkpoint any

Specified by:
checkpoint in interface PersistenceAdapter

doCheckpoint

public boolean doCheckpoint()
This does the actual checkpoint.

Returns:

readCommand

public DataStructure readCommand(org.apache.activeio.journal.RecordLocation location)
                          throws IOException
Parameters:
location -
Returns:
Throws:
IOException

createWriteException

protected IOException createWriteException(DataStructure packet,
                                           Exception e)

createWriteException

protected IOException createWriteException(String command,
                                           Exception e)

createRecoveryFailedException

protected IOException createRecoveryFailedException(Exception e)

writeCommand

public org.apache.activeio.journal.RecordLocation writeCommand(DataStructure command,
                                                               boolean sync)
                                                        throws IOException
Parameters:
command -
sync -
Returns:
Throws:
IOException

onUsageChanged

public void onUsageChanged(Usage usage,
                           int oldPercentUsage,
                           int newPercentUsage)
Specified by:
onUsageChanged in interface UsageListener

getTransactionStore

public JournalTransactionStore getTransactionStore()

deleteAllMessages

public void deleteAllMessages()
                       throws IOException
Description copied from interface: PersistenceAdapter
Delete's all the messages in the persistent store.

Specified by:
deleteAllMessages in interface PersistenceAdapter
Throws:
IOException

getUsageManager

public SystemUsage getUsageManager()

getMaxCheckpointMessageAddSize

public int getMaxCheckpointMessageAddSize()

setMaxCheckpointMessageAddSize

public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize)

getMaxCheckpointWorkers

public int getMaxCheckpointWorkers()

setMaxCheckpointWorkers

public void setMaxCheckpointWorkers(int maxCheckpointWorkers)

isUseExternalMessageReferences

public boolean isUseExternalMessageReferences()

setUseExternalMessageReferences

public void setUseExternalMessageReferences(boolean enable)

toPacket

public org.apache.activeio.packet.Packet toPacket(ByteSequence sequence)

toByteSequence

public ByteSequence toByteSequence(org.apache.activeio.packet.Packet packet)

setBrokerName

public void setBrokerName(String brokerName)
Description copied from interface: PersistenceAdapter
Set the name of the broker using the adapter

Specified by:
setBrokerName in interface PersistenceAdapter

toString

public String toString()
Overrides:
toString in class Object

setDirectory

public void setDirectory(File dir)
Description copied from interface: PersistenceAdapter
Set the directory where any data files should be created

Specified by:
setDirectory in interface PersistenceAdapter

getDirectory

public File getDirectory()
Specified by:
getDirectory in interface PersistenceAdapter
Returns:
the directory used by the persistence adaptor

size

public long size()
Description copied from interface: PersistenceAdapter
A hint to return the size of the store on disk

Specified by:
size in interface PersistenceAdapter
Returns:
disk space used in bytes of 0 if not implemented

setBrokerService

public void setBrokerService(BrokerService brokerService)
Specified by:
setBrokerService in interface BrokerServiceAware

getLastProducerSequenceId

public long getLastProducerSequenceId(ProducerId id)
Description copied from interface: PersistenceAdapter
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs

Specified by:
getLastProducerSequenceId in interface PersistenceAdapter
Parameters:
id - the producerId to find a sequenceId for
Returns:
the last stored sequence id or -1 if no suppression needed


Copyright © 2005-2012 The Apache Software Foundation. All Rights Reserved.