Class PrefetchSubscription
- java.lang.Object
-
- org.apache.activemq.broker.region.AbstractSubscription
-
- org.apache.activemq.broker.region.PrefetchSubscription
-
- All Implemented Interfaces:
Subscription
,SubscriptionRecovery
- Direct Known Subclasses:
DurableTopicSubscription
,QueueSubscription
public abstract class PrefetchSubscription extends AbstractSubscription
A subscription that honors the pre-fetch option of the ConsumerInfo.
-
-
Field Summary
Fields Modifier and Type Field Description protected List<MessageReference>
dispatched
protected Object
dispatchLock
protected PendingMessageCursor
pending
protected Object
pendingLock
protected Scheduler
scheduler
protected SystemUsage
usageManager
-
Fields inherited from class org.apache.activemq.broker.region.AbstractSubscription
broker, context, destinationFilter, destinations, info, prefetchExtension
-
-
Constructor Summary
Constructors Constructor Description PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
acknowledge(ConnectionContext context, MessageAck ack)
Used when client acknowledge receipt of dispatched message.protected abstract void
acknowledge(ConnectionContext context, MessageAck ack, MessageReference node)
Used during acknowledgment to remove the message.void
add(ConnectionContext context, Destination destination)
The subscription will be receiving messages from the destination.void
add(MessageReference node)
Used to add messages that match the subscription.protected void
assertAckMatchesDispatched(MessageAck ack)
Checks an ack versus the contents of the dispatched list.protected abstract boolean
canDispatch(MessageReference node)
Use when a matched message is about to be dispatched to the client.int
countBeforeFull()
protected MessageDispatch
createMessageDispatch(MessageReference node, Message message)
protected boolean
dispatch(MessageReference node)
void
dispatchPending()
long
getDequeueCounter()
long
getDispatchedCounter()
int
getDispatchedQueueSize()
long
getEnqueueCounter()
int
getInFlightSize()
int
getMaxAuditDepth()
int
getMaxProducersToAudit()
PendingMessageCursor
getPending()
long
getPendingMessageSize()
int
getPendingQueueSize()
protected abstract boolean
isDropped(MessageReference node)
boolean
isFull()
Used to determine if the broker can dispatch to the consumer.boolean
isHighWaterMark()
boolean
isLowWaterMark()
boolean
isRecoveryRequired()
Informs the Broker if the subscription needs to intervention to recover it's state e.g.protected void
onDispatch(MessageReference node, Message message)
void
processMessageDispatchNotification(MessageDispatchNotification mdn)
Used by a Slave Broker to update dispatch infomationResponse
pullMessage(ConnectionContext context, MessagePull pull)
Allows a message to be pulled on demand by a clientList<MessageReference>
remove(ConnectionContext context, Destination destination)
The subscription will be no longer be receiving messages from the destination.List<MessageReference>
remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched)
protected void
sendToDLQ(ConnectionContext context, MessageReference node, Throwable poisonCause)
void
setMaxAuditDepth(int maxAuditDepth)
void
setMaxProducersToAudit(int maxProducersToAudit)
void
setPending(PendingMessageCursor pending)
protected void
setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch)
void
setPrefetchSize(int prefetchSize)
protected boolean
trackedInPendingTransaction(MessageReference node)
void
updateConsumerPrefetch(int newPrefetch)
inform the MessageConsumer on the client to change it's prefetch-
Methods inherited from class org.apache.activemq.broker.region.AbstractSubscription
addDestination, addRecoveredMessage, contractPrefetchExtension, decrementPrefetchExtension, doAddRecoveredMessage, expandPrefetchExtension, gc, getActiveMQDestination, getConsumedCount, getConsumerInfo, getContext, getCursorMemoryHighWaterMark, getDestinations, getInFlightMessageSize, getInFlightUsage, getInfo, getObjectName, getPrefetchExtension, getPrefetchSize, getSelector, getSelectorExpression, getSubscriptionStatistics, getTimeOfLastMessageAck, incrementConsumedCount, isBrowser, isSlowConsumer, isUsePrefetchExtension, isWildcard, matches, matches, removeDestination, resetConsumedCount, setCursorMemoryHighWaterMark, setObjectName, setSelector, setSlowConsumer, setTimeOfLastMessageAck, setUsePrefetchExtension, unmatched, wakeupDestinationsForDispatch
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.activemq.broker.region.Subscription
destroy
-
-
-
-
Field Detail
-
scheduler
protected final Scheduler scheduler
-
pending
protected PendingMessageCursor pending
-
dispatched
protected final List<MessageReference> dispatched
-
usageManager
protected final SystemUsage usageManager
-
pendingLock
protected final Object pendingLock
-
dispatchLock
protected final Object dispatchLock
-
-
Constructor Detail
-
PrefetchSubscription
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException
- Throws:
JMSException
-
PrefetchSubscription
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException
- Throws:
JMSException
-
-
Method Detail
-
pullMessage
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception
Allows a message to be pulled on demand by a client- Throws:
Exception
-
add
public void add(MessageReference node) throws Exception
Description copied from interface:Subscription
Used to add messages that match the subscription.- Throws:
Exception
InterruptedException
IOException
-
processMessageDispatchNotification
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception
Description copied from interface:Subscription
Used by a Slave Broker to update dispatch infomation- Throws:
Exception
-
acknowledge
public final void acknowledge(ConnectionContext context, MessageAck ack) throws Exception
Description copied from interface:Subscription
Used when client acknowledge receipt of dispatched message.- Specified by:
acknowledge
in interfaceSubscription
- Overrides:
acknowledge
in classAbstractSubscription
- Throws:
IOException
Exception
-
assertAckMatchesDispatched
protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException
Checks an ack versus the contents of the dispatched list. called with dispatchLock held- Parameters:
ack
-- Throws:
JMSException
- if it does not match
-
sendToDLQ
protected void sendToDLQ(ConnectionContext context, MessageReference node, Throwable poisonCause) throws IOException, Exception
- Parameters:
context
-node
-poisonCause
-- Throws:
IOException
Exception
-
getInFlightSize
public int getInFlightSize()
- Returns:
- the number of messages awaiting acknowledgement
-
isFull
public boolean isFull()
Used to determine if the broker can dispatch to the consumer.- Returns:
- true if the subscription is full
-
isLowWaterMark
public boolean isLowWaterMark()
- Returns:
- true when 60% or more room is left for dispatching messages
-
isHighWaterMark
public boolean isHighWaterMark()
- Returns:
- true when 10% or less room is left for dispatching messages
-
countBeforeFull
public int countBeforeFull()
- Specified by:
countBeforeFull
in interfaceSubscription
- Overrides:
countBeforeFull
in classAbstractSubscription
- Returns:
- the number of messages this subscription can accept before its full
-
getPendingQueueSize
public int getPendingQueueSize()
- Returns:
- number of messages pending delivery
-
getPendingMessageSize
public long getPendingMessageSize()
- Returns:
- size of the messages pending delivery
-
getDispatchedQueueSize
public int getDispatchedQueueSize()
- Returns:
- number of messages dispatched to the client
-
getDequeueCounter
public long getDequeueCounter()
- Returns:
- number of messages queued by the client
-
getDispatchedCounter
public long getDispatchedCounter()
- Returns:
- number of messages dispatched to the client
-
getEnqueueCounter
public long getEnqueueCounter()
- Returns:
- number of messages that matched the subscription
-
isRecoveryRequired
public boolean isRecoveryRequired()
Description copied from interface:Subscription
Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber may do- Specified by:
isRecoveryRequired
in interfaceSubscription
- Overrides:
isRecoveryRequired
in classAbstractSubscription
- Returns:
- true if recovery required
- See Also:
PendingMessageCursor
-
getPending
public PendingMessageCursor getPending()
-
setPending
public void setPending(PendingMessageCursor pending)
-
add
public void add(ConnectionContext context, Destination destination) throws Exception
Description copied from interface:Subscription
The subscription will be receiving messages from the destination.- Specified by:
add
in interfaceSubscription
- Overrides:
add
in classAbstractSubscription
- Throws:
Exception
-
remove
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception
Description copied from interface:Subscription
The subscription will be no longer be receiving messages from the destination.- Specified by:
remove
in interfaceSubscription
- Overrides:
remove
in classAbstractSubscription
- Returns:
- a list of un-acked messages that were added to the subscription.
- Throws:
Exception
-
remove
public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception
- Throws:
Exception
-
dispatchPending
public void dispatchPending() throws IOException
- Throws:
IOException
-
trackedInPendingTransaction
protected boolean trackedInPendingTransaction(MessageReference node)
-
setPendingBatchSize
protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch)
-
dispatch
protected boolean dispatch(MessageReference node) throws IOException
- Throws:
IOException
-
onDispatch
protected void onDispatch(MessageReference node, Message message)
-
updateConsumerPrefetch
public void updateConsumerPrefetch(int newPrefetch)
inform the MessageConsumer on the client to change it's prefetch- Parameters:
newPrefetch
-
-
createMessageDispatch
protected MessageDispatch createMessageDispatch(MessageReference node, Message message)
- Parameters:
node
-message
-- Returns:
- MessageDispatch
-
canDispatch
protected abstract boolean canDispatch(MessageReference node) throws IOException
Use when a matched message is about to be dispatched to the client.- Parameters:
node
-- Returns:
- false if the message should not be dispatched to the client (another sub may have already dispatched it for example).
- Throws:
IOException
-
isDropped
protected abstract boolean isDropped(MessageReference node)
-
acknowledge
protected abstract void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException
Used during acknowledgment to remove the message.- Throws:
IOException
-
getMaxProducersToAudit
public int getMaxProducersToAudit()
-
setMaxProducersToAudit
public void setMaxProducersToAudit(int maxProducersToAudit)
-
getMaxAuditDepth
public int getMaxAuditDepth()
-
setMaxAuditDepth
public void setMaxAuditDepth(int maxAuditDepth)
-
setPrefetchSize
public void setPrefetchSize(int prefetchSize)
- Overrides:
setPrefetchSize
in classAbstractSubscription
-
-