Package org.apache.activemq.store
Class ProxyTopicMessageStore
- java.lang.Object
-
- org.apache.activemq.store.ProxyMessageStore
-
- org.apache.activemq.store.ProxyTopicMessageStore
-
- All Implemented Interfaces:
Service,MessageStore,TopicMessageStore
public class ProxyTopicMessageStore extends ProxyMessageStore implements TopicMessageStore
A simple proxy that delegates to another MessageStore.
-
-
Constructor Summary
Constructors Constructor Description ProxyTopicMessageStore(TopicMessageStore delegate)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidacknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack)Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching messages from the last checkpointvoidaddMessage(ConnectionContext context, Message message)Adds a message to the message storevoidaddMessage(ConnectionContext context, Message message, boolean canOptimizeHint)Adds a message to the message storevoidaddSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive)Inserts the subscriber info due to a subscription changeListenableFuture<Object>asyncAddQueueMessage(ConnectionContext context, Message message)Adds a message to the message storeListenableFuture<Object>asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint)Adds a message to the message storeListenableFuture<Object>asyncAddTopicMessage(ConnectionContext context, Message message)Adds a message to the message storeListenableFuture<Object>asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint)Adds a message to the message storevoiddeleteSubscription(String clientId, String subscriptionName)voiddispose(ConnectionContext context)SubscriptionInfo[]getAllSubscriptions()Lists all the durable subscriptions for a given destination.MessageStoregetDelegate()ActiveMQDestinationgetDestination()The destination that the message store is holding messages for.MessagegetMessage(MessageId identity)Looks up a message using either the String messageID or the messageNumber.intgetMessageCount()intgetMessageCount(String clientId, String subscriberName)Get the number of messages ready to deliver from the store to a durable subscriberlonggetMessageSize()longgetMessageSize(String clientId, String subscriberName)Get the total size of the messages ready to deliver from the store to the durable subscriberMessageStoreStatisticsgetMessageStoreStatistics()MessageStoreSubscriptionStatisticsgetMessageStoreSubStatistics()The subscription metrics contained in this storebooleanisEmpty()flag to indicate if the store is emptybooleanisPrioritizedMessages()SubscriptionInfolookupSubscription(String clientId, String subscriptionName)Finds the subscriber entry for the given consumer infovoidrecover(MessageRecoveryListener listener)Recover any messages to be delivered.voidrecoverNextMessages(int maxReturned, MessageRecoveryListener listener)voidrecoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener)For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId messageIdvoidrecoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)For the new subscription find the last acknowledged message ID and then find any new messages since then and dispatch them to the subscription.voidregisterIndexListener(IndexListener indexListener)voidremoveAllMessages(ConnectionContext context)Removes all the messages from the message store.voidremoveAsyncMessage(ConnectionContext context, MessageAck ack)voidremoveMessage(ConnectionContext context, MessageAck ack)Removes a message from the message store.voidresetBatching()A hint to the Store to reset any batching state for the DestinationvoidresetBatching(String clientId, String subscriptionName)A hint to the Store to reset any batching state for a durable subscribervoidsetBatch(MessageId messageId)allow caching cursors to set the current batch offset when cache is exhaustedvoidsetMemoryUsage(MemoryUsage memoryUsage)voidsetPrioritizedMessages(boolean prioritizedMessages)A hint to the store to try recover messages according to priorityvoidstart()voidstop()voidupdateMessage(Message message)-
Methods inherited from class org.apache.activemq.store.ProxyMessageStore
toString
-
-
-
-
Constructor Detail
-
ProxyTopicMessageStore
public ProxyTopicMessageStore(TopicMessageStore delegate)
-
-
Method Detail
-
getDelegate
public MessageStore getDelegate()
- Overrides:
getDelegatein classProxyMessageStore
-
addMessage
public void addMessage(ConnectionContext context, Message message) throws IOException
Description copied from interface:MessageStoreAdds a message to the message store- Specified by:
addMessagein interfaceMessageStore- Overrides:
addMessagein classProxyMessageStore- Parameters:
context- context- Throws:
IOException
-
addMessage
public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException
Description copied from interface:MessageStoreAdds a message to the message store- Specified by:
addMessagein interfaceMessageStore- Overrides:
addMessagein classProxyMessageStore- Parameters:
context- contextcanOptimizeHint- - give a hint to the store that the message may be consumed before it hits the disk- Throws:
IOException
-
getMessage
public Message getMessage(MessageId identity) throws IOException
Description copied from interface:MessageStoreLooks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill in the missing key if its easy to do so.- Specified by:
getMessagein interfaceMessageStore- Overrides:
getMessagein classProxyMessageStore- Parameters:
identity- which contains either the messageID or the messageNumber- Returns:
- the message or null if it does not exist
- Throws:
IOException
-
recover
public void recover(MessageRecoveryListener listener) throws Exception
Description copied from interface:MessageStoreRecover any messages to be delivered.- Specified by:
recoverin interfaceMessageStore- Overrides:
recoverin classProxyMessageStore- Throws:
Exception
-
removeAllMessages
public void removeAllMessages(ConnectionContext context) throws IOException
Description copied from interface:MessageStoreRemoves all the messages from the message store.- Specified by:
removeAllMessagesin interfaceMessageStore- Overrides:
removeAllMessagesin classProxyMessageStore- Throws:
IOException
-
removeMessage
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
Description copied from interface:MessageStoreRemoves a message from the message store.- Specified by:
removeMessagein interfaceMessageStore- Overrides:
removeMessagein classProxyMessageStoreack- the ack request that cause the message to be removed. It conatins the identity which contains the messageID of the message that needs to be removed.- Throws:
IOException
-
start
public void start() throws Exception- Specified by:
startin interfaceService- Overrides:
startin classProxyMessageStore- Throws:
Exception
-
stop
public void stop() throws Exception- Specified by:
stopin interfaceService- Overrides:
stopin classProxyMessageStore- Throws:
Exception
-
lookupSubscription
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException
Description copied from interface:TopicMessageStoreFinds the subscriber entry for the given consumer info- Specified by:
lookupSubscriptionin interfaceTopicMessageStore- Returns:
- the SubscriptionInfo
- Throws:
IOException
-
acknowledge
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException
Description copied from interface:TopicMessageStoreStores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching messages from the last checkpoint- Specified by:
acknowledgein interfaceTopicMessageStore- Throws:
IOException
-
addSubscription
public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException
Description copied from interface:TopicMessageStoreInserts the subscriber info due to a subscription change If this is a new subscription and the retroactive is false, then the last message sent to the topic should be set as the last message acknowledged by they new subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged message so that on recovery, all message recorded for the topic get replayed.- Specified by:
addSubscriptionin interfaceTopicMessageStore- Throws:
IOException
-
deleteSubscription
public void deleteSubscription(String clientId, String subscriptionName) throws IOException
- Specified by:
deleteSubscriptionin interfaceTopicMessageStore- Throws:
IOException
-
recoverSubscription
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception
Description copied from interface:TopicMessageStoreFor the new subscription find the last acknowledged message ID and then find any new messages since then and dispatch them to the subscription. e.g. if we dispatched some messages to a new durable topic subscriber, then went down before acknowledging any messages, we need to know the correct point from which to recover from.- Specified by:
recoverSubscriptionin interfaceTopicMessageStore- Throws:
Exception
-
recoverNextMessages
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception
Description copied from interface:TopicMessageStoreFor an active subscription - retrieve messages from the store for the subscriber after the lastMessageId messageId- Specified by:
recoverNextMessagesin interfaceTopicMessageStore- Throws:
Exception
-
resetBatching
public void resetBatching(String clientId, String subscriptionName)
Description copied from interface:TopicMessageStoreA hint to the Store to reset any batching state for a durable subscriber- Specified by:
resetBatchingin interfaceTopicMessageStore
-
getDestination
public ActiveMQDestination getDestination()
Description copied from interface:MessageStoreThe destination that the message store is holding messages for.- Specified by:
getDestinationin interfaceMessageStore- Overrides:
getDestinationin classProxyMessageStore- Returns:
- the destination
-
getAllSubscriptions
public SubscriptionInfo[] getAllSubscriptions() throws IOException
Description copied from interface:TopicMessageStoreLists all the durable subscriptions for a given destination.- Specified by:
getAllSubscriptionsin interfaceTopicMessageStore- Returns:
- an array SubscriptionInfos
- Throws:
IOException
-
setMemoryUsage
public void setMemoryUsage(MemoryUsage memoryUsage)
- Specified by:
setMemoryUsagein interfaceMessageStore- Overrides:
setMemoryUsagein classProxyMessageStore- Parameters:
memoryUsage- The SystemUsage that is controlling the destination's memory usage.
-
getMessageCount
public int getMessageCount(String clientId, String subscriberName) throws IOException
Description copied from interface:TopicMessageStoreGet the number of messages ready to deliver from the store to a durable subscriber- Specified by:
getMessageCountin interfaceTopicMessageStore- Returns:
- the outstanding message count
- Throws:
IOException
-
getMessageCount
public int getMessageCount() throws IOException- Specified by:
getMessageCountin interfaceMessageStore- Overrides:
getMessageCountin classProxyMessageStore- Returns:
- the number of messages ready to deliver
- Throws:
IOException
-
getMessageSize
public long getMessageSize() throws IOException- Specified by:
getMessageSizein interfaceMessageStore- Overrides:
getMessageSizein classProxyMessageStore- Returns:
- the size of the messages ready to deliver
- Throws:
IOException
-
recoverNextMessages
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception- Specified by:
recoverNextMessagesin interfaceMessageStore- Overrides:
recoverNextMessagesin classProxyMessageStore- Throws:
Exception
-
dispose
public void dispose(ConnectionContext context)
- Specified by:
disposein interfaceMessageStore- Overrides:
disposein classProxyMessageStore
-
resetBatching
public void resetBatching()
Description copied from interface:MessageStoreA hint to the Store to reset any batching state for the Destination- Specified by:
resetBatchingin interfaceMessageStore- Overrides:
resetBatchingin classProxyMessageStore
-
setBatch
public void setBatch(MessageId messageId) throws Exception
Description copied from interface:MessageStoreallow caching cursors to set the current batch offset when cache is exhausted- Specified by:
setBatchin interfaceMessageStore- Overrides:
setBatchin classProxyMessageStore- Throws:
Exception
-
isEmpty
public boolean isEmpty() throws ExceptionDescription copied from interface:MessageStoreflag to indicate if the store is empty- Specified by:
isEmptyin interfaceMessageStore- Overrides:
isEmptyin classProxyMessageStore- Returns:
- true if the message count is 0
- Throws:
Exception
-
asyncAddTopicMessage
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException
Description copied from interface:MessageStoreAdds a message to the message store- Specified by:
asyncAddTopicMessagein interfaceMessageStore- Overrides:
asyncAddTopicMessagein classProxyMessageStore- Parameters:
context- context- Returns:
- a ListenableFuture to track when this is complete
- Throws:
IOException
-
asyncAddTopicMessage
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException
Description copied from interface:MessageStoreAdds a message to the message store- Specified by:
asyncAddTopicMessagein interfaceMessageStore- Overrides:
asyncAddTopicMessagein classProxyMessageStore- Parameters:
context- contextcanOptimizeHint- - give a hint to the store that the message may be consumed before it hits the disk- Returns:
- a ListenableFuture to track when this is complete
- Throws:
IOException
-
asyncAddQueueMessage
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException
Description copied from interface:MessageStoreAdds a message to the message store- Specified by:
asyncAddQueueMessagein interfaceMessageStore- Overrides:
asyncAddQueueMessagein classProxyMessageStore- Parameters:
context- context- Returns:
- a Future to track when this is complete
- Throws:
IOException
-
asyncAddQueueMessage
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException
Description copied from interface:MessageStoreAdds a message to the message store- Specified by:
asyncAddQueueMessagein interfaceMessageStore- Overrides:
asyncAddQueueMessagein classProxyMessageStore- Parameters:
context- contextcanOptimizeHint- - give a hint to the store that the message may be consumed before it hits the disk- Returns:
- a Future to track when this is complete
- Throws:
IOException
-
removeAsyncMessage
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException
- Specified by:
removeAsyncMessagein interfaceMessageStore- Overrides:
removeAsyncMessagein classProxyMessageStore- Throws:
IOException
-
setPrioritizedMessages
public void setPrioritizedMessages(boolean prioritizedMessages)
Description copied from interface:MessageStoreA hint to the store to try recover messages according to priority- Specified by:
setPrioritizedMessagesin interfaceMessageStore- Overrides:
setPrioritizedMessagesin classProxyMessageStore
-
isPrioritizedMessages
public boolean isPrioritizedMessages()
- Specified by:
isPrioritizedMessagesin interfaceMessageStore- Overrides:
isPrioritizedMessagesin classProxyMessageStore- Returns:
- true if store is trying to recover messages according to priority
-
updateMessage
public void updateMessage(Message message) throws IOException
- Specified by:
updateMessagein interfaceMessageStore- Overrides:
updateMessagein classProxyMessageStore- Throws:
IOException
-
registerIndexListener
public void registerIndexListener(IndexListener indexListener)
- Specified by:
registerIndexListenerin interfaceMessageStore- Overrides:
registerIndexListenerin classProxyMessageStore
-
getMessageStoreStatistics
public MessageStoreStatistics getMessageStoreStatistics()
- Specified by:
getMessageStoreStatisticsin interfaceMessageStore- Overrides:
getMessageStoreStatisticsin classProxyMessageStore- Returns:
- The statistics bean for this message store
-
getMessageSize
public long getMessageSize(String clientId, String subscriberName) throws IOException
Description copied from interface:TopicMessageStoreGet the total size of the messages ready to deliver from the store to the durable subscriber- Specified by:
getMessageSizein interfaceTopicMessageStore- Returns:
- Throws:
IOException
-
getMessageStoreSubStatistics
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics()
Description copied from interface:TopicMessageStoreThe subscription metrics contained in this store- Specified by:
getMessageStoreSubStatisticsin interfaceTopicMessageStore- Returns:
-
-