Class PrefetchSubscription
- java.lang.Object
-
- org.apache.activemq.broker.region.AbstractSubscription
-
- org.apache.activemq.broker.region.PrefetchSubscription
-
- All Implemented Interfaces:
Subscription,SubscriptionRecovery
- Direct Known Subclasses:
DurableTopicSubscription,QueueSubscription
public abstract class PrefetchSubscription extends AbstractSubscription
A subscription that honors the pre-fetch option of the ConsumerInfo.
-
-
Field Summary
Fields Modifier and Type Field Description protected List<MessageReference>dispatchedprotected ObjectdispatchLockprotected PendingMessageCursorpendingprotected ObjectpendingLockprotected Schedulerschedulerprotected SystemUsageusageManager-
Fields inherited from class org.apache.activemq.broker.region.AbstractSubscription
broker, context, destinationFilter, destinations, info, prefetchExtension
-
-
Constructor Summary
Constructors Constructor Description PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description voidacknowledge(ConnectionContext context, MessageAck ack)Used when client acknowledge receipt of dispatched message.protected abstract voidacknowledge(ConnectionContext context, MessageAck ack, MessageReference node)Used during acknowledgment to remove the message.voidadd(ConnectionContext context, Destination destination)The subscription will be receiving messages from the destination.voidadd(MessageReference node)Used to add messages that match the subscription.protected voidassertAckMatchesDispatched(MessageAck ack)Checks an ack versus the contents of the dispatched list.protected abstract booleancanDispatch(MessageReference node)Use when a matched message is about to be dispatched to the client.intcountBeforeFull()protected MessageDispatchcreateMessageDispatch(MessageReference node, Message message)protected booleandispatch(MessageReference node)voiddispatchPending()longgetDequeueCounter()longgetDispatchedCounter()intgetDispatchedQueueSize()longgetEnqueueCounter()intgetInFlightSize()intgetMaxAuditDepth()intgetMaxProducersToAudit()PendingMessageCursorgetPending()longgetPendingMessageSize()intgetPendingQueueSize()protected abstract booleanisDropped(MessageReference node)booleanisFull()Used to determine if the broker can dispatch to the consumer.booleanisHighWaterMark()booleanisLowWaterMark()booleanisRecoveryRequired()Informs the Broker if the subscription needs to intervention to recover it's state e.g.protected voidonDispatch(MessageReference node, Message message)voidprocessMessageDispatchNotification(MessageDispatchNotification mdn)Used by a Slave Broker to update dispatch infomationResponsepullMessage(ConnectionContext context, MessagePull pull)Allows a message to be pulled on demand by a clientList<MessageReference>remove(ConnectionContext context, Destination destination)The subscription will be no longer be receiving messages from the destination.List<MessageReference>remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched)protected voidsendToDLQ(ConnectionContext context, MessageReference node, Throwable poisonCause)voidsetMaxAuditDepth(int maxAuditDepth)voidsetMaxProducersToAudit(int maxProducersToAudit)voidsetPending(PendingMessageCursor pending)protected voidsetPendingBatchSize(PendingMessageCursor pending, int numberToDispatch)voidsetPrefetchSize(int prefetchSize)protected booleantrackedInPendingTransaction(MessageReference node)voidupdateConsumerPrefetch(int newPrefetch)inform the MessageConsumer on the client to change it's prefetch-
Methods inherited from class org.apache.activemq.broker.region.AbstractSubscription
addDestination, addRecoveredMessage, contractPrefetchExtension, decrementPrefetchExtension, doAddRecoveredMessage, expandPrefetchExtension, gc, getActiveMQDestination, getConsumedCount, getConsumerInfo, getContext, getCursorMemoryHighWaterMark, getDestinations, getInFlightMessageSize, getInFlightUsage, getInfo, getObjectName, getPrefetchExtension, getPrefetchSize, getSelector, getSelectorExpression, getSubscriptionStatistics, getTimeOfLastMessageAck, incrementConsumedCount, isBrowser, isSlowConsumer, isUsePrefetchExtension, isWildcard, matches, matches, removeDestination, resetConsumedCount, setCursorMemoryHighWaterMark, setObjectName, setSelector, setSlowConsumer, setTimeOfLastMessageAck, setUsePrefetchExtension, unmatched, wakeupDestinationsForDispatch
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.activemq.broker.region.Subscription
destroy
-
-
-
-
Field Detail
-
scheduler
protected final Scheduler scheduler
-
pending
protected PendingMessageCursor pending
-
dispatched
protected final List<MessageReference> dispatched
-
usageManager
protected final SystemUsage usageManager
-
pendingLock
protected final Object pendingLock
-
dispatchLock
protected final Object dispatchLock
-
-
Constructor Detail
-
PrefetchSubscription
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException
- Throws:
JMSException
-
PrefetchSubscription
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException
- Throws:
JMSException
-
-
Method Detail
-
pullMessage
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception
Allows a message to be pulled on demand by a client- Throws:
Exception
-
add
public void add(MessageReference node) throws Exception
Description copied from interface:SubscriptionUsed to add messages that match the subscription.- Throws:
ExceptionInterruptedExceptionIOException
-
processMessageDispatchNotification
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception
Description copied from interface:SubscriptionUsed by a Slave Broker to update dispatch infomation- Throws:
Exception
-
acknowledge
public final void acknowledge(ConnectionContext context, MessageAck ack) throws Exception
Description copied from interface:SubscriptionUsed when client acknowledge receipt of dispatched message.- Specified by:
acknowledgein interfaceSubscription- Overrides:
acknowledgein classAbstractSubscription- Throws:
IOExceptionException
-
assertAckMatchesDispatched
protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException
Checks an ack versus the contents of the dispatched list. called with dispatchLock held- Parameters:
ack-- Throws:
JMSException- if it does not match
-
sendToDLQ
protected void sendToDLQ(ConnectionContext context, MessageReference node, Throwable poisonCause) throws IOException, Exception
- Parameters:
context-node-poisonCause-- Throws:
IOExceptionException
-
getInFlightSize
public int getInFlightSize()
- Returns:
- the number of messages awaiting acknowledgement
-
isFull
public boolean isFull()
Used to determine if the broker can dispatch to the consumer.- Returns:
- true if the subscription is full
-
isLowWaterMark
public boolean isLowWaterMark()
- Returns:
- true when 60% or more room is left for dispatching messages
-
isHighWaterMark
public boolean isHighWaterMark()
- Returns:
- true when 10% or less room is left for dispatching messages
-
countBeforeFull
public int countBeforeFull()
- Specified by:
countBeforeFullin interfaceSubscription- Overrides:
countBeforeFullin classAbstractSubscription- Returns:
- the number of messages this subscription can accept before its full
-
getPendingQueueSize
public int getPendingQueueSize()
- Returns:
- number of messages pending delivery
-
getPendingMessageSize
public long getPendingMessageSize()
- Returns:
- size of the messages pending delivery
-
getDispatchedQueueSize
public int getDispatchedQueueSize()
- Returns:
- number of messages dispatched to the client
-
getDequeueCounter
public long getDequeueCounter()
- Returns:
- number of messages queued by the client
-
getDispatchedCounter
public long getDispatchedCounter()
- Returns:
- number of messages dispatched to the client
-
getEnqueueCounter
public long getEnqueueCounter()
- Returns:
- number of messages that matched the subscription
-
isRecoveryRequired
public boolean isRecoveryRequired()
Description copied from interface:SubscriptionInforms the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber may do- Specified by:
isRecoveryRequiredin interfaceSubscription- Overrides:
isRecoveryRequiredin classAbstractSubscription- Returns:
- true if recovery required
- See Also:
PendingMessageCursor
-
getPending
public PendingMessageCursor getPending()
-
setPending
public void setPending(PendingMessageCursor pending)
-
add
public void add(ConnectionContext context, Destination destination) throws Exception
Description copied from interface:SubscriptionThe subscription will be receiving messages from the destination.- Specified by:
addin interfaceSubscription- Overrides:
addin classAbstractSubscription- Throws:
Exception
-
remove
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception
Description copied from interface:SubscriptionThe subscription will be no longer be receiving messages from the destination.- Specified by:
removein interfaceSubscription- Overrides:
removein classAbstractSubscription- Returns:
- a list of un-acked messages that were added to the subscription.
- Throws:
Exception
-
remove
public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception
- Throws:
Exception
-
dispatchPending
public void dispatchPending() throws IOException- Throws:
IOException
-
trackedInPendingTransaction
protected boolean trackedInPendingTransaction(MessageReference node)
-
setPendingBatchSize
protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch)
-
dispatch
protected boolean dispatch(MessageReference node) throws IOException
- Throws:
IOException
-
onDispatch
protected void onDispatch(MessageReference node, Message message)
-
updateConsumerPrefetch
public void updateConsumerPrefetch(int newPrefetch)
inform the MessageConsumer on the client to change it's prefetch- Parameters:
newPrefetch-
-
createMessageDispatch
protected MessageDispatch createMessageDispatch(MessageReference node, Message message)
- Parameters:
node-message-- Returns:
- MessageDispatch
-
canDispatch
protected abstract boolean canDispatch(MessageReference node) throws IOException
Use when a matched message is about to be dispatched to the client.- Parameters:
node-- Returns:
- false if the message should not be dispatched to the client (another sub may have already dispatched it for example).
- Throws:
IOException
-
isDropped
protected abstract boolean isDropped(MessageReference node)
-
acknowledge
protected abstract void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException
Used during acknowledgment to remove the message.- Throws:
IOException
-
getMaxProducersToAudit
public int getMaxProducersToAudit()
-
setMaxProducersToAudit
public void setMaxProducersToAudit(int maxProducersToAudit)
-
getMaxAuditDepth
public int getMaxAuditDepth()
-
setMaxAuditDepth
public void setMaxAuditDepth(int maxAuditDepth)
-
setPrefetchSize
public void setPrefetchSize(int prefetchSize)
- Overrides:
setPrefetchSizein classAbstractSubscription
-
-