Class AbstractSubscription
- java.lang.Object
-
- org.apache.activemq.broker.region.AbstractSubscription
-
- All Implemented Interfaces:
Subscription
,SubscriptionRecovery
- Direct Known Subclasses:
PrefetchSubscription
,TopicSubscription
public abstract class AbstractSubscription extends Object implements Subscription
-
-
Field Summary
Fields Modifier and Type Field Description protected Broker
broker
protected ConnectionContext
context
protected DestinationFilter
destinationFilter
protected CopyOnWriteArrayList<Destination>
destinations
protected ConsumerInfo
info
protected AtomicInteger
prefetchExtension
-
Constructor Summary
Constructors Constructor Description AbstractSubscription(Broker broker, ConnectionContext context, ConsumerInfo info)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
acknowledge(ConnectionContext context, MessageAck ack)
Used when client acknowledge receipt of dispatched message.void
add(ConnectionContext context, Destination destination)
The subscription will be receiving messages from the destination.void
addDestination(Destination destination)
Add a destinationboolean
addRecoveredMessage(ConnectionContext context, MessageReference message)
Add a message to the SubscriptionRecoveryprotected void
contractPrefetchExtension(int amount)
int
countBeforeFull()
protected void
decrementPrefetchExtension(int amount)
protected void
doAddRecoveredMessage(MessageReference message)
protected void
expandPrefetchExtension(int amount)
void
gc()
The subscription should release as may references as it can to help the garbage collector reclaim memory.ActiveMQDestination
getActiveMQDestination()
long
getConsumedCount()
ConsumerInfo
getConsumerInfo()
The ConsumerInfo object that created the subscription.ConnectionContext
getContext()
int
getCursorMemoryHighWaterMark()
CopyOnWriteArrayList<Destination>
getDestinations()
long
getInFlightMessageSize()
int
getInFlightUsage()
ConsumerInfo
getInfo()
ObjectName
getObjectName()
AtomicInteger
getPrefetchExtension()
int
getPrefetchSize()
String
getSelector()
BooleanExpression
getSelectorExpression()
SubscriptionStatistics
getSubscriptionStatistics()
long
getTimeOfLastMessageAck()
Returns the time since the last Ack message was received by this subscription.void
incrementConsumedCount()
boolean
isBrowser()
boolean
isRecoveryRequired()
Informs the Broker if the subscription needs to intervention to recover it's state e.g.boolean
isSlowConsumer()
boolean
isUsePrefetchExtension()
boolean
isWildcard()
Returns true if this subscription is a Wildcard subscription.boolean
matches(MessageReference node, MessageEvaluationContext context)
Is the subscription interested in the message?boolean
matches(ActiveMQDestination destination)
Is the subscription interested in messages in the destination?List<MessageReference>
remove(ConnectionContext context, Destination destination)
The subscription will be no longer be receiving messages from the destination.void
removeDestination(Destination destination)
Remove a destinationvoid
resetConsumedCount()
void
setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark)
void
setObjectName(ObjectName objectName)
Set when the subscription is registered in JMXvoid
setPrefetchSize(int newSize)
void
setSelector(String selector)
Attempts to change the current active selector on the subscription.void
setSlowConsumer(boolean val)
void
setTimeOfLastMessageAck(long value)
void
setUsePrefetchExtension(boolean usePrefetchExtension)
void
unmatched(MessageReference node)
void
wakeupDestinationsForDispatch()
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.activemq.broker.region.Subscription
add, destroy, getDequeueCounter, getDispatchedCounter, getDispatchedQueueSize, getEnqueueCounter, getInFlightSize, getPendingMessageSize, getPendingQueueSize, isFull, isHighWaterMark, isLowWaterMark, processMessageDispatchNotification, pullMessage, updateConsumerPrefetch
-
-
-
-
Field Detail
-
broker
protected Broker broker
-
context
protected ConnectionContext context
-
info
protected ConsumerInfo info
-
destinationFilter
protected final DestinationFilter destinationFilter
-
destinations
protected final CopyOnWriteArrayList<Destination> destinations
-
prefetchExtension
protected final AtomicInteger prefetchExtension
-
-
Constructor Detail
-
AbstractSubscription
public AbstractSubscription(Broker broker, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException
- Throws:
InvalidSelectorException
-
-
Method Detail
-
acknowledge
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception
Description copied from interface:Subscription
Used when client acknowledge receipt of dispatched message.- Specified by:
acknowledge
in interfaceSubscription
- Throws:
IOException
Exception
-
matches
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException
Description copied from interface:Subscription
Is the subscription interested in the message?- Specified by:
matches
in interfaceSubscription
- Returns:
- true if matching
- Throws:
IOException
-
isWildcard
public boolean isWildcard()
Description copied from interface:Subscription
Returns true if this subscription is a Wildcard subscription.- Specified by:
isWildcard
in interfaceSubscription
- Returns:
- true if wildcard subscription.
-
matches
public boolean matches(ActiveMQDestination destination)
Description copied from interface:Subscription
Is the subscription interested in messages in the destination?- Specified by:
matches
in interfaceSubscription
- Returns:
- true if matching
-
add
public void add(ConnectionContext context, Destination destination) throws Exception
Description copied from interface:Subscription
The subscription will be receiving messages from the destination.- Specified by:
add
in interfaceSubscription
- Throws:
Exception
-
remove
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception
Description copied from interface:Subscription
The subscription will be no longer be receiving messages from the destination.- Specified by:
remove
in interfaceSubscription
- Returns:
- a list of un-acked messages that were added to the subscription.
- Throws:
Exception
-
getConsumerInfo
public ConsumerInfo getConsumerInfo()
Description copied from interface:Subscription
The ConsumerInfo object that created the subscription.- Specified by:
getConsumerInfo
in interfaceSubscription
-
gc
public void gc()
Description copied from interface:Subscription
The subscription should release as may references as it can to help the garbage collector reclaim memory.- Specified by:
gc
in interfaceSubscription
-
getContext
public ConnectionContext getContext()
- Specified by:
getContext
in interfaceSubscription
-
getInfo
public ConsumerInfo getInfo()
-
getSelectorExpression
public BooleanExpression getSelectorExpression()
-
getSelector
public String getSelector()
- Specified by:
getSelector
in interfaceSubscription
- Returns:
- the JMS selector on the current subscription
-
setSelector
public void setSelector(String selector) throws InvalidSelectorException
Description copied from interface:Subscription
Attempts to change the current active selector on the subscription. This operation is not supported for persistent topics.- Specified by:
setSelector
in interfaceSubscription
- Throws:
InvalidSelectorException
-
getObjectName
public ObjectName getObjectName()
- Specified by:
getObjectName
in interfaceSubscription
- Returns:
- the JMX object name that this subscription was registered as if applicable
-
setObjectName
public void setObjectName(ObjectName objectName)
Description copied from interface:Subscription
Set when the subscription is registered in JMX- Specified by:
setObjectName
in interfaceSubscription
-
getPrefetchSize
public int getPrefetchSize()
- Specified by:
getPrefetchSize
in interfaceSubscription
- Returns:
- the prefetch size that is configured for the subscription
-
isUsePrefetchExtension
public boolean isUsePrefetchExtension()
-
setUsePrefetchExtension
public void setUsePrefetchExtension(boolean usePrefetchExtension)
-
setPrefetchSize
public void setPrefetchSize(int newSize)
-
isRecoveryRequired
public boolean isRecoveryRequired()
Description copied from interface:Subscription
Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber may do- Specified by:
isRecoveryRequired
in interfaceSubscription
- Returns:
- true if recovery required
- See Also:
PendingMessageCursor
-
isSlowConsumer
public boolean isSlowConsumer()
- Specified by:
isSlowConsumer
in interfaceSubscription
-
setSlowConsumer
public void setSlowConsumer(boolean val)
-
addRecoveredMessage
public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception
Description copied from interface:SubscriptionRecovery
Add a message to the SubscriptionRecovery- Specified by:
addRecoveredMessage
in interfaceSubscriptionRecovery
- Returns:
- true if the message is accepted
- Throws:
Exception
-
getActiveMQDestination
public ActiveMQDestination getActiveMQDestination()
- Specified by:
getActiveMQDestination
in interfaceSubscriptionRecovery
- Returns:
- the Destination associated with this Subscription
-
isBrowser
public boolean isBrowser()
- Specified by:
isBrowser
in interfaceSubscription
- Returns:
- true if a browser
-
getInFlightMessageSize
public long getInFlightMessageSize()
- Specified by:
getInFlightMessageSize
in interfaceSubscription
- Returns:
- the size in bytes of the messages awaiting acknowledgement
-
getInFlightUsage
public int getInFlightUsage()
- Specified by:
getInFlightUsage
in interfaceSubscription
- Returns:
- the in flight messages as a percentage of the prefetch size
-
addDestination
public void addDestination(Destination destination)
Add a destination- Parameters:
destination
-
-
removeDestination
public void removeDestination(Destination destination)
Remove a destination- Parameters:
destination
-
-
getCursorMemoryHighWaterMark
public int getCursorMemoryHighWaterMark()
- Specified by:
getCursorMemoryHighWaterMark
in interfaceSubscription
-
setCursorMemoryHighWaterMark
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark)
- Specified by:
setCursorMemoryHighWaterMark
in interfaceSubscription
-
countBeforeFull
public int countBeforeFull()
- Specified by:
countBeforeFull
in interfaceSubscription
- Returns:
- the number of messages this subscription can accept before its full
-
unmatched
public void unmatched(MessageReference node) throws IOException
- Specified by:
unmatched
in interfaceSubscription
- Throws:
IOException
-
doAddRecoveredMessage
protected void doAddRecoveredMessage(MessageReference message) throws Exception
- Throws:
Exception
-
getTimeOfLastMessageAck
public long getTimeOfLastMessageAck()
Description copied from interface:Subscription
Returns the time since the last Ack message was received by this subscription. If there has never been an ack this value should be set to the creation time of the subscription.- Specified by:
getTimeOfLastMessageAck
in interfaceSubscription
- Returns:
- time of last received Ack message or Subscription create time if no Acks.
-
setTimeOfLastMessageAck
public void setTimeOfLastMessageAck(long value)
-
getConsumedCount
public long getConsumedCount()
- Specified by:
getConsumedCount
in interfaceSubscription
-
incrementConsumedCount
public void incrementConsumedCount()
- Specified by:
incrementConsumedCount
in interfaceSubscription
-
resetConsumedCount
public void resetConsumedCount()
- Specified by:
resetConsumedCount
in interfaceSubscription
-
getSubscriptionStatistics
public SubscriptionStatistics getSubscriptionStatistics()
- Specified by:
getSubscriptionStatistics
in interfaceSubscription
-
wakeupDestinationsForDispatch
public void wakeupDestinationsForDispatch()
-
getPrefetchExtension
public AtomicInteger getPrefetchExtension()
-
contractPrefetchExtension
protected void contractPrefetchExtension(int amount)
-
expandPrefetchExtension
protected void expandPrefetchExtension(int amount)
-
decrementPrefetchExtension
protected void decrementPrefetchExtension(int amount)
-
getDestinations
public CopyOnWriteArrayList<Destination> getDestinations()
-
-