Package org.apache.activemq.store.kahadb
Class KahaDBStore.KahaDBMessageStore
- java.lang.Object
-
- org.apache.activemq.store.AbstractMessageStore
-
- org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore
-
- All Implemented Interfaces:
Service
,MessageStore
- Enclosing class:
- KahaDBStore
public class KahaDBStore.KahaDBMessageStore extends AbstractMessageStore
-
-
Field Summary
Fields Modifier and Type Field Description protected HashMap<String,Set<String>>
ackedAndPreparedMap
protected Map<org.apache.activemq.store.kahadb.KahaDBStore.AsyncJobKey,KahaDBStore.StoreTask>
asyncTaskMap
protected KahaDestination
dest
protected HashMap<String,Set<String>>
rolledBackAcksMap
-
Fields inherited from class org.apache.activemq.store.AbstractMessageStore
destination, FUTURE, indexListener, messageStoreStatistics, prioritizedMessages
-
-
Constructor Summary
Constructors Constructor Description KahaDBMessageStore(ActiveMQDestination destination)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
acquireLocalAsyncLock()
void
addMessage(ConnectionContext context, Message message)
Adds a message to the message storeListenableFuture<Object>
asyncAddQueueMessage(ConnectionContext context, Message message)
Adds a message to the message storevoid
forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback)
ActiveMQDestination
getDestination()
The destination that the message store is holding messages for.Message
getMessage(MessageId identity)
Looks up a message using either the String messageID or the messageNumber.boolean
isEmpty()
flag to indicate if the store is emptyprotected void
lockAsyncJobQueue()
void
recover(MessageRecoveryListener listener)
Recover any messages to be delivered.protected void
recoverMessageStoreStatistics()
void
recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
protected int
recoverRolledBackAcks(String recoveredTxStateMapKey, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener)
protected void
releaseLocalAsyncLock()
void
removeAllMessages(ConnectionContext context)
Removes all the messages from the message store.void
removeAsyncMessage(ConnectionContext context, MessageAck ack)
void
removeMessage(ConnectionContext context, MessageAck ack)
Removes a message from the message store.void
resetBatching()
A hint to the Store to reset any batching state for the Destinationvoid
setBatch(MessageId identity)
allow caching cursors to set the current batch offset when cache is exhaustedvoid
setMemoryUsage(MemoryUsage memoryUsage)
void
start()
void
stop()
String
toString()
void
trackRecoveredAcks(ArrayList<MessageAck> acks)
protected void
unlockAsyncJobQueue()
void
updateMessage(Message message)
-
Methods inherited from class org.apache.activemq.store.AbstractMessageStore
addMessage, asyncAddQueueMessage, asyncAddTopicMessage, asyncAddTopicMessage, dispose, getIndexListener, getMessageCount, getMessageSize, getMessageStoreStatistics, isPrioritizedMessages, registerIndexListener, setPrioritizedMessages
-
-
-
-
Field Detail
-
asyncTaskMap
protected final Map<org.apache.activemq.store.kahadb.KahaDBStore.AsyncJobKey,KahaDBStore.StoreTask> asyncTaskMap
-
dest
protected KahaDestination dest
-
-
Constructor Detail
-
KahaDBMessageStore
public KahaDBMessageStore(ActiveMQDestination destination)
-
-
Method Detail
-
getDestination
public ActiveMQDestination getDestination()
Description copied from interface:MessageStore
The destination that the message store is holding messages for.- Specified by:
getDestination
in interfaceMessageStore
- Overrides:
getDestination
in classAbstractMessageStore
- Returns:
- the destination
-
trackRecoveredAcks
public void trackRecoveredAcks(ArrayList<MessageAck> acks)
-
forgetRecoveredAcks
public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException
- Throws:
IOException
-
asyncAddQueueMessage
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException
Description copied from interface:MessageStore
Adds a message to the message store- Specified by:
asyncAddQueueMessage
in interfaceMessageStore
- Overrides:
asyncAddQueueMessage
in classAbstractMessageStore
- Parameters:
context
- context- Returns:
- a Future to track when this is complete
- Throws:
IOException
-
removeAsyncMessage
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException
- Specified by:
removeAsyncMessage
in interfaceMessageStore
- Overrides:
removeAsyncMessage
in classAbstractMessageStore
- Throws:
IOException
-
addMessage
public void addMessage(ConnectionContext context, Message message) throws IOException
Description copied from interface:MessageStore
Adds a message to the message store- Parameters:
context
- context- Throws:
IOException
-
updateMessage
public void updateMessage(Message message) throws IOException
- Specified by:
updateMessage
in interfaceMessageStore
- Overrides:
updateMessage
in classAbstractMessageStore
- Throws:
IOException
-
removeMessage
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
Description copied from interface:MessageStore
Removes a message from the message store.ack
- the ack request that cause the message to be removed. It conatins the identity which contains the messageID of the message that needs to be removed.- Throws:
IOException
-
removeAllMessages
public void removeAllMessages(ConnectionContext context) throws IOException
Description copied from interface:MessageStore
Removes all the messages from the message store.- Throws:
IOException
-
getMessage
public Message getMessage(MessageId identity) throws IOException
Description copied from interface:MessageStore
Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill in the missing key if its easy to do so.- Parameters:
identity
- which contains either the messageID or the messageNumber- Returns:
- the message or null if it does not exist
- Throws:
IOException
-
isEmpty
public boolean isEmpty() throws IOException
Description copied from class:AbstractMessageStore
flag to indicate if the store is empty- Specified by:
isEmpty
in interfaceMessageStore
- Overrides:
isEmpty
in classAbstractMessageStore
- Returns:
- true if the message count is 0
- Throws:
IOException
-
recover
public void recover(MessageRecoveryListener listener) throws Exception
Description copied from interface:MessageStore
Recover any messages to be delivered.- Throws:
Exception
-
recoverNextMessages
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception
- Throws:
Exception
-
recoverRolledBackAcks
protected int recoverRolledBackAcks(String recoveredTxStateMapKey, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception
- Throws:
Exception
-
resetBatching
public void resetBatching()
Description copied from interface:MessageStore
A hint to the Store to reset any batching state for the Destination
-
setBatch
public void setBatch(MessageId identity) throws IOException
Description copied from interface:MessageStore
allow caching cursors to set the current batch offset when cache is exhausted- Specified by:
setBatch
in interfaceMessageStore
- Overrides:
setBatch
in classAbstractMessageStore
- Throws:
IOException
-
setMemoryUsage
public void setMemoryUsage(MemoryUsage memoryUsage)
- Specified by:
setMemoryUsage
in interfaceMessageStore
- Overrides:
setMemoryUsage
in classAbstractMessageStore
- Parameters:
memoryUsage
- The SystemUsage that is controlling the destination's memory usage.
-
start
public void start() throws Exception
- Specified by:
start
in interfaceService
- Overrides:
start
in classAbstractMessageStore
- Throws:
Exception
-
stop
public void stop() throws Exception
- Specified by:
stop
in interfaceService
- Overrides:
stop
in classAbstractMessageStore
- Throws:
Exception
-
lockAsyncJobQueue
protected void lockAsyncJobQueue()
-
unlockAsyncJobQueue
protected void unlockAsyncJobQueue()
-
acquireLocalAsyncLock
protected void acquireLocalAsyncLock()
-
releaseLocalAsyncLock
protected void releaseLocalAsyncLock()
-
recoverMessageStoreStatistics
protected void recoverMessageStoreStatistics() throws IOException
- Overrides:
recoverMessageStoreStatistics
in classAbstractMessageStore
- Throws:
IOException
-
-