Class JournalMessageStore
- java.lang.Object
-
- org.apache.activemq.store.AbstractMessageStore
-
- org.apache.activemq.store.journal.JournalMessageStore
-
- All Implemented Interfaces:
Service
,MessageStore
- Direct Known Subclasses:
JournalTopicMessageStore
public class JournalMessageStore extends AbstractMessageStore
A MessageStore that uses a Journal to store it's messages.
-
-
Field Summary
Fields Modifier and Type Field Description protected Set<org.apache.activeio.journal.RecordLocation>
inFlightTxLocations
protected org.apache.activeio.journal.RecordLocation
lastLocation
protected MessageStore
longTermStore
protected JournalPersistenceAdapter
peristenceAdapter
protected JournalTransactionStore
transactionStore
protected TransactionTemplate
transactionTemplate
-
Fields inherited from class org.apache.activemq.store.AbstractMessageStore
destination, FUTURE, indexListener, messageStoreStatistics, prioritizedMessages
-
-
Constructor Summary
Constructors Constructor Description JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addMessage(ConnectionContext context, Message message)
Not synchronized since the Journal has better throughput if you increase the number of concurrent writes that it is doing.void
addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef)
org.apache.activeio.journal.RecordLocation
checkpoint()
org.apache.activeio.journal.RecordLocation
checkpoint(Callback postCheckpointTest)
MessageStore
getLongTermMessageStore()
Message
getMessage(MessageId identity)
Looks up a message using either the String messageID or the messageNumber.int
getMessageCount()
String
getMessageReference(MessageId identity)
long
getMessageSize()
void
recover(MessageRecoveryListener listener)
Replays the checkpointStore first as those messages are the oldest ones, then messages are replayed from the transaction log and then the cache is updated.void
recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
void
removeAllMessages(ConnectionContext context)
Removes all the messages from the message store.void
removeMessage(ConnectionContext context, MessageAck ack)
Removes a message from the message store.void
replayAddMessage(ConnectionContext context, Message message)
void
replayRemoveMessage(ConnectionContext context, MessageAck messageAck)
void
resetBatching()
A hint to the Store to reset any batching state for the Destinationvoid
setBatch(MessageId messageId)
allow caching cursors to set the current batch offset when cache is exhaustedvoid
setMemoryUsage(MemoryUsage memoryUsage)
void
start()
void
stop()
-
Methods inherited from class org.apache.activemq.store.AbstractMessageStore
addMessage, asyncAddQueueMessage, asyncAddQueueMessage, asyncAddTopicMessage, asyncAddTopicMessage, dispose, getDestination, getIndexListener, getMessageStoreStatistics, isEmpty, isPrioritizedMessages, recoverMessageStoreStatistics, registerIndexListener, removeAsyncMessage, setPrioritizedMessages, updateMessage
-
-
-
-
Field Detail
-
peristenceAdapter
protected final JournalPersistenceAdapter peristenceAdapter
-
transactionStore
protected final JournalTransactionStore transactionStore
-
longTermStore
protected final MessageStore longTermStore
-
transactionTemplate
protected final TransactionTemplate transactionTemplate
-
lastLocation
protected org.apache.activeio.journal.RecordLocation lastLocation
-
inFlightTxLocations
protected Set<org.apache.activeio.journal.RecordLocation> inFlightTxLocations
-
-
Constructor Detail
-
JournalMessageStore
public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination)
-
-
Method Detail
-
setMemoryUsage
public void setMemoryUsage(MemoryUsage memoryUsage)
- Specified by:
setMemoryUsage
in interfaceMessageStore
- Overrides:
setMemoryUsage
in classAbstractMessageStore
- Parameters:
memoryUsage
- The SystemUsage that is controlling the destination's memory usage.
-
addMessage
public void addMessage(ConnectionContext context, Message message) throws IOException
Not synchronized since the Journal has better throughput if you increase the number of concurrent writes that it is doing.- Parameters:
context
- context- Throws:
IOException
-
replayAddMessage
public void replayAddMessage(ConnectionContext context, Message message)
-
removeMessage
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
Description copied from interface:MessageStore
Removes a message from the message store.ack
- the ack request that cause the message to be removed. It conatins the identity which contains the messageID of the message that needs to be removed.- Throws:
IOException
-
replayRemoveMessage
public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck)
-
checkpoint
public org.apache.activeio.journal.RecordLocation checkpoint() throws IOException
- Returns:
- Throws:
IOException
-
checkpoint
public org.apache.activeio.journal.RecordLocation checkpoint(Callback postCheckpointTest) throws IOException
- Returns:
- Throws:
IOException
-
getMessage
public Message getMessage(MessageId identity) throws IOException
Description copied from interface:MessageStore
Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill in the missing key if its easy to do so.- Parameters:
identity
- which contains either the messageID or the messageNumber- Returns:
- the message or null if it does not exist
- Throws:
IOException
-
recover
public void recover(MessageRecoveryListener listener) throws Exception
Replays the checkpointStore first as those messages are the oldest ones, then messages are replayed from the transaction log and then the cache is updated.- Parameters:
listener
-- Throws:
Exception
-
start
public void start() throws Exception
- Specified by:
start
in interfaceService
- Overrides:
start
in classAbstractMessageStore
- Throws:
Exception
-
stop
public void stop() throws Exception
- Specified by:
stop
in interfaceService
- Overrides:
stop
in classAbstractMessageStore
- Throws:
Exception
-
getLongTermMessageStore
public MessageStore getLongTermMessageStore()
- Returns:
- Returns the longTermStore.
-
removeAllMessages
public void removeAllMessages(ConnectionContext context) throws IOException
Description copied from interface:MessageStore
Removes all the messages from the message store.- Throws:
IOException
- See Also:
MessageStore.removeAllMessages(ConnectionContext)
-
addMessageReference
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException
- Throws:
IOException
-
getMessageReference
public String getMessageReference(MessageId identity) throws IOException
- Throws:
IOException
-
getMessageCount
public int getMessageCount() throws IOException
- Specified by:
getMessageCount
in interfaceMessageStore
- Overrides:
getMessageCount
in classAbstractMessageStore
- Returns:
- Throws:
IOException
- See Also:
MessageStore.getMessageCount()
-
getMessageSize
public long getMessageSize() throws IOException
- Specified by:
getMessageSize
in interfaceMessageStore
- Overrides:
getMessageSize
in classAbstractMessageStore
- Returns:
- the size of the messages ready to deliver
- Throws:
IOException
-
recoverNextMessages
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception
- Throws:
Exception
-
resetBatching
public void resetBatching()
Description copied from interface:MessageStore
A hint to the Store to reset any batching state for the Destination
-
setBatch
public void setBatch(MessageId messageId) throws Exception
Description copied from interface:MessageStore
allow caching cursors to set the current batch offset when cache is exhausted- Specified by:
setBatch
in interfaceMessageStore
- Overrides:
setBatch
in classAbstractMessageStore
- Throws:
Exception
-
-