Package org.apache.activemq.store
Class ProxyTopicMessageStore
- java.lang.Object
-
- org.apache.activemq.store.ProxyMessageStore
-
- org.apache.activemq.store.ProxyTopicMessageStore
-
- All Implemented Interfaces:
Service
,MessageStore
,TopicMessageStore
public class ProxyTopicMessageStore extends ProxyMessageStore implements TopicMessageStore
A simple proxy that delegates to another MessageStore.
-
-
Constructor Summary
Constructors Constructor Description ProxyTopicMessageStore(TopicMessageStore delegate)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack)
Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching messages from the last checkpointvoid
addMessage(ConnectionContext context, Message message)
Adds a message to the message storevoid
addMessage(ConnectionContext context, Message message, boolean canOptimizeHint)
Adds a message to the message storevoid
addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive)
Inserts the subscriber info due to a subscription changeListenableFuture<Object>
asyncAddQueueMessage(ConnectionContext context, Message message)
Adds a message to the message storeListenableFuture<Object>
asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint)
Adds a message to the message storeListenableFuture<Object>
asyncAddTopicMessage(ConnectionContext context, Message message)
Adds a message to the message storeListenableFuture<Object>
asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint)
Adds a message to the message storevoid
deleteSubscription(String clientId, String subscriptionName)
void
dispose(ConnectionContext context)
SubscriptionInfo[]
getAllSubscriptions()
Lists all the durable subscriptions for a given destination.MessageStore
getDelegate()
ActiveMQDestination
getDestination()
The destination that the message store is holding messages for.Message
getMessage(MessageId identity)
Looks up a message using either the String messageID or the messageNumber.int
getMessageCount()
int
getMessageCount(String clientId, String subscriberName)
Get the number of messages ready to deliver from the store to a durable subscriberlong
getMessageSize()
long
getMessageSize(String clientId, String subscriberName)
Get the total size of the messages ready to deliver from the store to the durable subscriberMessageStoreStatistics
getMessageStoreStatistics()
MessageStoreSubscriptionStatistics
getMessageStoreSubStatistics()
The subscription metrics contained in this storeboolean
isEmpty()
flag to indicate if the store is emptyboolean
isPrioritizedMessages()
SubscriptionInfo
lookupSubscription(String clientId, String subscriptionName)
Finds the subscriber entry for the given consumer infovoid
recover(MessageRecoveryListener listener)
Recover any messages to be delivered.void
recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
void
recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener)
For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId messageIdvoid
recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
For the new subscription find the last acknowledged message ID and then find any new messages since then and dispatch them to the subscription.void
registerIndexListener(IndexListener indexListener)
void
removeAllMessages(ConnectionContext context)
Removes all the messages from the message store.void
removeAsyncMessage(ConnectionContext context, MessageAck ack)
void
removeMessage(ConnectionContext context, MessageAck ack)
Removes a message from the message store.void
resetBatching()
A hint to the Store to reset any batching state for the Destinationvoid
resetBatching(String clientId, String subscriptionName)
A hint to the Store to reset any batching state for a durable subscribervoid
setBatch(MessageId messageId)
allow caching cursors to set the current batch offset when cache is exhaustedvoid
setMemoryUsage(MemoryUsage memoryUsage)
void
setPrioritizedMessages(boolean prioritizedMessages)
A hint to the store to try recover messages according to priorityvoid
start()
void
stop()
void
updateMessage(Message message)
-
Methods inherited from class org.apache.activemq.store.ProxyMessageStore
toString
-
-
-
-
Constructor Detail
-
ProxyTopicMessageStore
public ProxyTopicMessageStore(TopicMessageStore delegate)
-
-
Method Detail
-
getDelegate
public MessageStore getDelegate()
- Overrides:
getDelegate
in classProxyMessageStore
-
addMessage
public void addMessage(ConnectionContext context, Message message) throws IOException
Description copied from interface:MessageStore
Adds a message to the message store- Specified by:
addMessage
in interfaceMessageStore
- Overrides:
addMessage
in classProxyMessageStore
- Parameters:
context
- context- Throws:
IOException
-
addMessage
public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException
Description copied from interface:MessageStore
Adds a message to the message store- Specified by:
addMessage
in interfaceMessageStore
- Overrides:
addMessage
in classProxyMessageStore
- Parameters:
context
- contextcanOptimizeHint
- - give a hint to the store that the message may be consumed before it hits the disk- Throws:
IOException
-
getMessage
public Message getMessage(MessageId identity) throws IOException
Description copied from interface:MessageStore
Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill in the missing key if its easy to do so.- Specified by:
getMessage
in interfaceMessageStore
- Overrides:
getMessage
in classProxyMessageStore
- Parameters:
identity
- which contains either the messageID or the messageNumber- Returns:
- the message or null if it does not exist
- Throws:
IOException
-
recover
public void recover(MessageRecoveryListener listener) throws Exception
Description copied from interface:MessageStore
Recover any messages to be delivered.- Specified by:
recover
in interfaceMessageStore
- Overrides:
recover
in classProxyMessageStore
- Throws:
Exception
-
removeAllMessages
public void removeAllMessages(ConnectionContext context) throws IOException
Description copied from interface:MessageStore
Removes all the messages from the message store.- Specified by:
removeAllMessages
in interfaceMessageStore
- Overrides:
removeAllMessages
in classProxyMessageStore
- Throws:
IOException
-
removeMessage
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
Description copied from interface:MessageStore
Removes a message from the message store.- Specified by:
removeMessage
in interfaceMessageStore
- Overrides:
removeMessage
in classProxyMessageStore
ack
- the ack request that cause the message to be removed. It conatins the identity which contains the messageID of the message that needs to be removed.- Throws:
IOException
-
start
public void start() throws Exception
- Specified by:
start
in interfaceService
- Overrides:
start
in classProxyMessageStore
- Throws:
Exception
-
stop
public void stop() throws Exception
- Specified by:
stop
in interfaceService
- Overrides:
stop
in classProxyMessageStore
- Throws:
Exception
-
lookupSubscription
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException
Description copied from interface:TopicMessageStore
Finds the subscriber entry for the given consumer info- Specified by:
lookupSubscription
in interfaceTopicMessageStore
- Returns:
- the SubscriptionInfo
- Throws:
IOException
-
acknowledge
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException
Description copied from interface:TopicMessageStore
Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching messages from the last checkpoint- Specified by:
acknowledge
in interfaceTopicMessageStore
- Throws:
IOException
-
addSubscription
public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException
Description copied from interface:TopicMessageStore
Inserts the subscriber info due to a subscription change If this is a new subscription and the retroactive is false, then the last message sent to the topic should be set as the last message acknowledged by they new subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged message so that on recovery, all message recorded for the topic get replayed.- Specified by:
addSubscription
in interfaceTopicMessageStore
- Throws:
IOException
-
deleteSubscription
public void deleteSubscription(String clientId, String subscriptionName) throws IOException
- Specified by:
deleteSubscription
in interfaceTopicMessageStore
- Throws:
IOException
-
recoverSubscription
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception
Description copied from interface:TopicMessageStore
For the new subscription find the last acknowledged message ID and then find any new messages since then and dispatch them to the subscription. e.g. if we dispatched some messages to a new durable topic subscriber, then went down before acknowledging any messages, we need to know the correct point from which to recover from.- Specified by:
recoverSubscription
in interfaceTopicMessageStore
- Throws:
Exception
-
recoverNextMessages
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception
Description copied from interface:TopicMessageStore
For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId messageId- Specified by:
recoverNextMessages
in interfaceTopicMessageStore
- Throws:
Exception
-
resetBatching
public void resetBatching(String clientId, String subscriptionName)
Description copied from interface:TopicMessageStore
A hint to the Store to reset any batching state for a durable subscriber- Specified by:
resetBatching
in interfaceTopicMessageStore
-
getDestination
public ActiveMQDestination getDestination()
Description copied from interface:MessageStore
The destination that the message store is holding messages for.- Specified by:
getDestination
in interfaceMessageStore
- Overrides:
getDestination
in classProxyMessageStore
- Returns:
- the destination
-
getAllSubscriptions
public SubscriptionInfo[] getAllSubscriptions() throws IOException
Description copied from interface:TopicMessageStore
Lists all the durable subscriptions for a given destination.- Specified by:
getAllSubscriptions
in interfaceTopicMessageStore
- Returns:
- an array SubscriptionInfos
- Throws:
IOException
-
setMemoryUsage
public void setMemoryUsage(MemoryUsage memoryUsage)
- Specified by:
setMemoryUsage
in interfaceMessageStore
- Overrides:
setMemoryUsage
in classProxyMessageStore
- Parameters:
memoryUsage
- The SystemUsage that is controlling the destination's memory usage.
-
getMessageCount
public int getMessageCount(String clientId, String subscriberName) throws IOException
Description copied from interface:TopicMessageStore
Get the number of messages ready to deliver from the store to a durable subscriber- Specified by:
getMessageCount
in interfaceTopicMessageStore
- Returns:
- the outstanding message count
- Throws:
IOException
-
getMessageCount
public int getMessageCount() throws IOException
- Specified by:
getMessageCount
in interfaceMessageStore
- Overrides:
getMessageCount
in classProxyMessageStore
- Returns:
- the number of messages ready to deliver
- Throws:
IOException
-
getMessageSize
public long getMessageSize() throws IOException
- Specified by:
getMessageSize
in interfaceMessageStore
- Overrides:
getMessageSize
in classProxyMessageStore
- Returns:
- the size of the messages ready to deliver
- Throws:
IOException
-
recoverNextMessages
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception
- Specified by:
recoverNextMessages
in interfaceMessageStore
- Overrides:
recoverNextMessages
in classProxyMessageStore
- Throws:
Exception
-
dispose
public void dispose(ConnectionContext context)
- Specified by:
dispose
in interfaceMessageStore
- Overrides:
dispose
in classProxyMessageStore
-
resetBatching
public void resetBatching()
Description copied from interface:MessageStore
A hint to the Store to reset any batching state for the Destination- Specified by:
resetBatching
in interfaceMessageStore
- Overrides:
resetBatching
in classProxyMessageStore
-
setBatch
public void setBatch(MessageId messageId) throws Exception
Description copied from interface:MessageStore
allow caching cursors to set the current batch offset when cache is exhausted- Specified by:
setBatch
in interfaceMessageStore
- Overrides:
setBatch
in classProxyMessageStore
- Throws:
Exception
-
isEmpty
public boolean isEmpty() throws Exception
Description copied from interface:MessageStore
flag to indicate if the store is empty- Specified by:
isEmpty
in interfaceMessageStore
- Overrides:
isEmpty
in classProxyMessageStore
- Returns:
- true if the message count is 0
- Throws:
Exception
-
asyncAddTopicMessage
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException
Description copied from interface:MessageStore
Adds a message to the message store- Specified by:
asyncAddTopicMessage
in interfaceMessageStore
- Overrides:
asyncAddTopicMessage
in classProxyMessageStore
- Parameters:
context
- context- Returns:
- a ListenableFuture to track when this is complete
- Throws:
IOException
-
asyncAddTopicMessage
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException
Description copied from interface:MessageStore
Adds a message to the message store- Specified by:
asyncAddTopicMessage
in interfaceMessageStore
- Overrides:
asyncAddTopicMessage
in classProxyMessageStore
- Parameters:
context
- contextcanOptimizeHint
- - give a hint to the store that the message may be consumed before it hits the disk- Returns:
- a ListenableFuture to track when this is complete
- Throws:
IOException
-
asyncAddQueueMessage
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException
Description copied from interface:MessageStore
Adds a message to the message store- Specified by:
asyncAddQueueMessage
in interfaceMessageStore
- Overrides:
asyncAddQueueMessage
in classProxyMessageStore
- Parameters:
context
- context- Returns:
- a Future to track when this is complete
- Throws:
IOException
-
asyncAddQueueMessage
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException
Description copied from interface:MessageStore
Adds a message to the message store- Specified by:
asyncAddQueueMessage
in interfaceMessageStore
- Overrides:
asyncAddQueueMessage
in classProxyMessageStore
- Parameters:
context
- contextcanOptimizeHint
- - give a hint to the store that the message may be consumed before it hits the disk- Returns:
- a Future to track when this is complete
- Throws:
IOException
-
removeAsyncMessage
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException
- Specified by:
removeAsyncMessage
in interfaceMessageStore
- Overrides:
removeAsyncMessage
in classProxyMessageStore
- Throws:
IOException
-
setPrioritizedMessages
public void setPrioritizedMessages(boolean prioritizedMessages)
Description copied from interface:MessageStore
A hint to the store to try recover messages according to priority- Specified by:
setPrioritizedMessages
in interfaceMessageStore
- Overrides:
setPrioritizedMessages
in classProxyMessageStore
-
isPrioritizedMessages
public boolean isPrioritizedMessages()
- Specified by:
isPrioritizedMessages
in interfaceMessageStore
- Overrides:
isPrioritizedMessages
in classProxyMessageStore
- Returns:
- true if store is trying to recover messages according to priority
-
updateMessage
public void updateMessage(Message message) throws IOException
- Specified by:
updateMessage
in interfaceMessageStore
- Overrides:
updateMessage
in classProxyMessageStore
- Throws:
IOException
-
registerIndexListener
public void registerIndexListener(IndexListener indexListener)
- Specified by:
registerIndexListener
in interfaceMessageStore
- Overrides:
registerIndexListener
in classProxyMessageStore
-
getMessageStoreStatistics
public MessageStoreStatistics getMessageStoreStatistics()
- Specified by:
getMessageStoreStatistics
in interfaceMessageStore
- Overrides:
getMessageStoreStatistics
in classProxyMessageStore
- Returns:
- The statistics bean for this message store
-
getMessageSize
public long getMessageSize(String clientId, String subscriberName) throws IOException
Description copied from interface:TopicMessageStore
Get the total size of the messages ready to deliver from the store to the durable subscriber- Specified by:
getMessageSize
in interfaceTopicMessageStore
- Returns:
- Throws:
IOException
-
getMessageStoreSubStatistics
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics()
Description copied from interface:TopicMessageStore
The subscription metrics contained in this store- Specified by:
getMessageStoreSubStatistics
in interfaceTopicMessageStore
- Returns:
-
-