Class DurableTopicSubscription
- java.lang.Object
-
- org.apache.activemq.broker.region.AbstractSubscription
-
- org.apache.activemq.broker.region.PrefetchSubscription
-
- org.apache.activemq.broker.region.DurableTopicSubscription
-
- All Implemented Interfaces:
Subscription
,SubscriptionRecovery
,UsageListener
public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener
-
-
Field Summary
-
Fields inherited from class org.apache.activemq.broker.region.PrefetchSubscription
dispatched, dispatchLock, pending, pendingLock, scheduler, usageManager
-
Fields inherited from class org.apache.activemq.broker.region.AbstractSubscription
broker, context, destinationFilter, destinations, info, prefetchExtension
-
-
Constructor Summary
Constructors Constructor Description DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
acknowledge(ConnectionContext context, MessageAck ack, MessageReference node)
Used during acknowledgment to remove the message.void
activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker)
void
add(ConnectionContext context, Destination destination)
The subscription will be receiving messages from the destination.void
add(MessageReference node)
Used to add messages that match the subscription.protected boolean
canDispatch(MessageReference node)
Use when a matched message is about to be dispatched to the client.protected MessageDispatch
createMessageDispatch(MessageReference node, Message message)
void
deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId)
void
destroy()
Release any references that we are holding.void
dispatchPending()
protected void
doAddRecoveredMessage(MessageReference message)
void
gc()
The subscription should release as may references as it can to help the garbage collector reclaim memory.long
getOfflineTimestamp()
int
getPendingQueueSize()
SubscriptionKey
getSubscriptionKey()
boolean
isActive()
protected boolean
isDropped(MessageReference node)
boolean
isEmpty(Topic topic)
boolean
isEnableMessageExpirationOnActiveDurableSubs()
boolean
isFull()
Used to determine if the broker can dispatch to the consumer.boolean
isKeepDurableSubsActive()
void
onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)
void
removePending(MessageReference node)
void
setOfflineTimestamp(long timestamp)
protected void
setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch)
void
setSelector(String selector)
Attempts to change the current active selector on the subscription.String
toString()
protected boolean
trackedInPendingTransaction(MessageReference node)
void
unmatched(MessageReference node)
store will have a pending ack for all durables, irrespective of the selector so we need to ack if node is un-matched-
Methods inherited from class org.apache.activemq.broker.region.PrefetchSubscription
acknowledge, assertAckMatchesDispatched, countBeforeFull, dispatch, getDequeueCounter, getDispatchedCounter, getDispatchedQueueSize, getEnqueueCounter, getInFlightSize, getMaxAuditDepth, getMaxProducersToAudit, getPending, getPendingMessageSize, isHighWaterMark, isLowWaterMark, isRecoveryRequired, onDispatch, processMessageDispatchNotification, pullMessage, remove, remove, sendToDLQ, setMaxAuditDepth, setMaxProducersToAudit, setPending, setPrefetchSize, updateConsumerPrefetch
-
Methods inherited from class org.apache.activemq.broker.region.AbstractSubscription
addDestination, addRecoveredMessage, contractPrefetchExtension, decrementPrefetchExtension, expandPrefetchExtension, getActiveMQDestination, getConsumedCount, getConsumerInfo, getContext, getCursorMemoryHighWaterMark, getDestinations, getInFlightMessageSize, getInFlightUsage, getInfo, getObjectName, getPrefetchExtension, getPrefetchSize, getSelector, getSelectorExpression, getSubscriptionStatistics, getTimeOfLastMessageAck, incrementConsumedCount, isBrowser, isSlowConsumer, isUsePrefetchExtension, isWildcard, matches, matches, removeDestination, resetConsumedCount, setCursorMemoryHighWaterMark, setObjectName, setSlowConsumer, setTimeOfLastMessageAck, setUsePrefetchExtension, wakeupDestinationsForDispatch
-
-
-
-
Constructor Detail
-
DurableTopicSubscription
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws JMSException
- Throws:
JMSException
-
-
Method Detail
-
isActive
public final boolean isActive()
-
getOfflineTimestamp
public final long getOfflineTimestamp()
-
setOfflineTimestamp
public void setOfflineTimestamp(long timestamp)
-
isFull
public boolean isFull()
Description copied from class:PrefetchSubscription
Used to determine if the broker can dispatch to the consumer.- Specified by:
isFull
in interfaceSubscription
- Overrides:
isFull
in classPrefetchSubscription
- Returns:
- true if the subscription is full
-
gc
public void gc()
Description copied from interface:Subscription
The subscription should release as may references as it can to help the garbage collector reclaim memory.- Specified by:
gc
in interfaceSubscription
- Overrides:
gc
in classAbstractSubscription
-
unmatched
public void unmatched(MessageReference node) throws IOException
store will have a pending ack for all durables, irrespective of the selector so we need to ack if node is un-matched- Specified by:
unmatched
in interfaceSubscription
- Overrides:
unmatched
in classAbstractSubscription
- Throws:
IOException
-
setPendingBatchSize
protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch)
- Overrides:
setPendingBatchSize
in classPrefetchSubscription
-
add
public void add(ConnectionContext context, Destination destination) throws Exception
Description copied from interface:Subscription
The subscription will be receiving messages from the destination.- Specified by:
add
in interfaceSubscription
- Overrides:
add
in classPrefetchSubscription
- Throws:
Exception
-
isEmpty
public boolean isEmpty(Topic topic)
-
activate
public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception
- Throws:
Exception
-
deactivate
public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception
- Throws:
Exception
-
createMessageDispatch
protected MessageDispatch createMessageDispatch(MessageReference node, Message message)
- Overrides:
createMessageDispatch
in classPrefetchSubscription
- Returns:
- MessageDispatch
-
add
public void add(MessageReference node) throws Exception
Description copied from interface:Subscription
Used to add messages that match the subscription.- Specified by:
add
in interfaceSubscription
- Overrides:
add
in classPrefetchSubscription
- Throws:
Exception
InterruptedException
IOException
-
dispatchPending
public void dispatchPending() throws IOException
- Overrides:
dispatchPending
in classPrefetchSubscription
- Throws:
IOException
-
removePending
public void removePending(MessageReference node) throws IOException
- Throws:
IOException
-
doAddRecoveredMessage
protected void doAddRecoveredMessage(MessageReference message) throws Exception
- Overrides:
doAddRecoveredMessage
in classAbstractSubscription
- Throws:
Exception
-
getPendingQueueSize
public int getPendingQueueSize()
- Specified by:
getPendingQueueSize
in interfaceSubscription
- Overrides:
getPendingQueueSize
in classPrefetchSubscription
- Returns:
- number of messages pending delivery
-
setSelector
public void setSelector(String selector) throws InvalidSelectorException
Description copied from interface:Subscription
Attempts to change the current active selector on the subscription. This operation is not supported for persistent topics.- Specified by:
setSelector
in interfaceSubscription
- Overrides:
setSelector
in classAbstractSubscription
- Throws:
InvalidSelectorException
-
canDispatch
protected boolean canDispatch(MessageReference node)
Description copied from class:PrefetchSubscription
Use when a matched message is about to be dispatched to the client.- Specified by:
canDispatch
in classPrefetchSubscription
- Returns:
- false if the message should not be dispatched to the client (another sub may have already dispatched it for example).
-
trackedInPendingTransaction
protected boolean trackedInPendingTransaction(MessageReference node)
- Overrides:
trackedInPendingTransaction
in classPrefetchSubscription
-
acknowledge
protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException
Description copied from class:PrefetchSubscription
Used during acknowledgment to remove the message.- Specified by:
acknowledge
in classPrefetchSubscription
- Throws:
IOException
-
getSubscriptionKey
public SubscriptionKey getSubscriptionKey()
-
destroy
public void destroy()
Release any references that we are holding.- Specified by:
destroy
in interfaceSubscription
-
onUsageChanged
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)
- Specified by:
onUsageChanged
in interfaceUsageListener
-
isDropped
protected boolean isDropped(MessageReference node)
- Specified by:
isDropped
in classPrefetchSubscription
-
isKeepDurableSubsActive
public boolean isKeepDurableSubsActive()
-
isEnableMessageExpirationOnActiveDurableSubs
public boolean isEnableMessageExpirationOnActiveDurableSubs()
-
-