org.apache.activemq.broker.region
Class PrefetchSubscription

java.lang.Object
  extended by org.apache.activemq.broker.region.AbstractSubscription
      extended by org.apache.activemq.broker.region.PrefetchSubscription
All Implemented Interfaces:
Subscription, SubscriptionRecovery
Direct Known Subclasses:
DurableTopicSubscription, QueueSubscription

public abstract class PrefetchSubscription
extends AbstractSubscription

A subscription that honors the pre-fetch option of the ConsumerInfo.


Field Summary
protected  long dequeueCounter
           
protected  long dispatchCounter
           
protected  List<MessageReference> dispatched
           
protected  Object dispatchLock
           
protected  long enqueueCounter
           
protected  PendingMessageCursor pending
           
protected  Object pendingLock
           
protected  AtomicInteger prefetchExtension
           
protected  Scheduler scheduler
           
protected  SystemUsage usageManager
           
protected  boolean usePrefetchExtension
           
 
Fields inherited from class org.apache.activemq.broker.region.AbstractSubscription
broker, context, destinationFilter, destinations, info
 
Constructor Summary
PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
           
PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor)
           
 
Method Summary
 void acknowledge(ConnectionContext context, MessageAck ack)
          Used when client acknowledge receipt of dispatched message.
protected abstract  void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node)
          Used during acknowledgment to remove the message.
 void add(ConnectionContext context, Destination destination)
          The subscription will be receiving messages from the destination.
 void add(MessageReference node)
          Used to add messages that match the subscription.
protected  void assertAckMatchesDispatched(MessageAck ack)
          Checks an ack versus the contents of the dispatched list.
protected abstract  boolean canDispatch(MessageReference node)
          Use when a matched message is about to be dispatched to the client.
 int countBeforeFull()
           
protected  MessageDispatch createMessageDispatch(MessageReference node, Message message)
           
protected  boolean dispatch(MessageReference node)
           
protected  void dispatchPending()
           
 long getDequeueCounter()
           
 long getDispatchedCounter()
           
 int getDispatchedQueueSize()
           
 long getEnqueueCounter()
           
 int getInFlightSize()
           
 int getMaxAuditDepth()
           
 int getMaxProducersToAudit()
           
 PendingMessageCursor getPending()
           
 int getPendingQueueSize()
           
protected  int getPrefetchExtension()
           
protected abstract  boolean isDropped(MessageReference node)
           
 boolean isFull()
          Used to determine if the broker can dispatch to the consumer.
 boolean isHighWaterMark()
           
 boolean isLowWaterMark()
           
 boolean isRecoveryRequired()
          Informs the Broker if the subscription needs to intervention to recover it's state e.g.
 boolean isUsePrefetchExtension()
           
protected  void onDispatch(MessageReference node, Message message)
           
 void processMessageDispatchNotification(MessageDispatchNotification mdn)
          Used by a Slave Broker to update dispatch infomation
 Response pullMessage(ConnectionContext context, MessagePull pull)
          Allows a message to be pulled on demand by a client
 List<MessageReference> remove(ConnectionContext context, Destination destination)
          The subscription will be no longer be receiving messages from the destination.
protected  void sendToDLQ(ConnectionContext context, MessageReference node)
           
 void setMaxAuditDepth(int maxAuditDepth)
           
 void setMaxProducersToAudit(int maxProducersToAudit)
           
 void setPending(PendingMessageCursor pending)
           
protected  void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch)
           
 void setPrefetchSize(int prefetchSize)
           
 void setUsePrefetchExtension(boolean usePrefetchExtension)
           
 void updateConsumerPrefetch(int newPrefetch)
          inform the MessageConsumer on the client to change it's prefetch
 
Methods inherited from class org.apache.activemq.broker.region.AbstractSubscription
addDestination, addRecoveredMessage, doAddRecoveredMessage, gc, getActiveMQDestination, getConsumerInfo, getContext, getCursorMemoryHighWaterMark, getInFlightUsage, getInfo, getObjectName, getPrefetchSize, getSelector, getSelectorExpression, isBrowser, isSlave, isSlowConsumer, matches, matches, removeDestination, setCursorMemoryHighWaterMark, setObjectName, setSelector, setSlowConsumer, unmatched
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface org.apache.activemq.broker.region.Subscription
destroy
 

Field Detail

scheduler

protected final Scheduler scheduler

pending

protected PendingMessageCursor pending

dispatched

protected final List<MessageReference> dispatched

prefetchExtension

protected final AtomicInteger prefetchExtension

usePrefetchExtension

protected boolean usePrefetchExtension

enqueueCounter

protected long enqueueCounter

dispatchCounter

protected long dispatchCounter

dequeueCounter

protected long dequeueCounter

usageManager

protected final SystemUsage usageManager

pendingLock

protected final Object pendingLock

dispatchLock

protected final Object dispatchLock
Constructor Detail

PrefetchSubscription

public PrefetchSubscription(Broker broker,
                            SystemUsage usageManager,
                            ConnectionContext context,
                            ConsumerInfo info,
                            PendingMessageCursor cursor)
                     throws InvalidSelectorException
Throws:
InvalidSelectorException

PrefetchSubscription

public PrefetchSubscription(Broker broker,
                            SystemUsage usageManager,
                            ConnectionContext context,
                            ConsumerInfo info)
                     throws InvalidSelectorException
Throws:
InvalidSelectorException
Method Detail

pullMessage

public Response pullMessage(ConnectionContext context,
                            MessagePull pull)
                     throws Exception
Allows a message to be pulled on demand by a client

Throws:
Exception

add

public void add(MessageReference node)
         throws Exception
Description copied from interface: Subscription
Used to add messages that match the subscription.

Throws:
Exception
InterruptedException
IOException

processMessageDispatchNotification

public void processMessageDispatchNotification(MessageDispatchNotification mdn)
                                        throws Exception
Description copied from interface: Subscription
Used by a Slave Broker to update dispatch infomation

Throws:
Exception

acknowledge

public final void acknowledge(ConnectionContext context,
                              MessageAck ack)
                       throws Exception
Description copied from interface: Subscription
Used when client acknowledge receipt of dispatched message.

Throws:
IOException
Exception

assertAckMatchesDispatched

protected void assertAckMatchesDispatched(MessageAck ack)
                                   throws JMSException
Checks an ack versus the contents of the dispatched list. called with dispatchLock held

Parameters:
ack -
Throws:
JMSException - if it does not match

sendToDLQ

protected void sendToDLQ(ConnectionContext context,
                         MessageReference node)
                  throws IOException,
                         Exception
Parameters:
context -
node -
Throws:
IOException
Exception

getInFlightSize

public int getInFlightSize()
Returns:
the number of messages awaiting acknowledgement

isFull

public boolean isFull()
Used to determine if the broker can dispatch to the consumer.

Returns:

isLowWaterMark

public boolean isLowWaterMark()
Returns:
true when 60% or more room is left for dispatching messages

isHighWaterMark

public boolean isHighWaterMark()
Returns:
true when 10% or less room is left for dispatching messages

countBeforeFull

public int countBeforeFull()
Specified by:
countBeforeFull in interface Subscription
Overrides:
countBeforeFull in class AbstractSubscription
Returns:
the number of messages this subscription can accept before its full

getPendingQueueSize

public int getPendingQueueSize()
Returns:
number of messages pending delivery

getDispatchedQueueSize

public int getDispatchedQueueSize()
Returns:
number of messages dispatched to the client

getDequeueCounter

public long getDequeueCounter()
Returns:
number of messages queued by the client

getDispatchedCounter

public long getDispatchedCounter()
Returns:
number of messages dispatched to the client

getEnqueueCounter

public long getEnqueueCounter()
Returns:
number of messages that matched the subscription

isRecoveryRequired

public boolean isRecoveryRequired()
Description copied from interface: Subscription
Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber may do

Specified by:
isRecoveryRequired in interface Subscription
Overrides:
isRecoveryRequired in class AbstractSubscription
Returns:
true if recovery required
See Also:
org.apache.activemq.region.cursors.PendingMessageCursor

getPending

public PendingMessageCursor getPending()

setPending

public void setPending(PendingMessageCursor pending)

add

public void add(ConnectionContext context,
                Destination destination)
         throws Exception
Description copied from interface: Subscription
The subscription will be receiving messages from the destination.

Specified by:
add in interface Subscription
Overrides:
add in class AbstractSubscription
Throws:
Exception

remove

public List<MessageReference> remove(ConnectionContext context,
                                     Destination destination)
                              throws Exception
Description copied from interface: Subscription
The subscription will be no longer be receiving messages from the destination.

Specified by:
remove in interface Subscription
Overrides:
remove in class AbstractSubscription
Returns:
a list of un-acked messages that were added to the subscription.
Throws:
Exception

dispatchPending

protected void dispatchPending()
                        throws IOException
Throws:
IOException

setPendingBatchSize

protected void setPendingBatchSize(PendingMessageCursor pending,
                                   int numberToDispatch)

dispatch

protected boolean dispatch(MessageReference node)
                    throws IOException
Throws:
IOException

onDispatch

protected void onDispatch(MessageReference node,
                          Message message)

updateConsumerPrefetch

public void updateConsumerPrefetch(int newPrefetch)
inform the MessageConsumer on the client to change it's prefetch

Parameters:
newPrefetch -

createMessageDispatch

protected MessageDispatch createMessageDispatch(MessageReference node,
                                                Message message)
Parameters:
node -
message -
Returns:
MessageDispatch

canDispatch

protected abstract boolean canDispatch(MessageReference node)
                                throws IOException
Use when a matched message is about to be dispatched to the client.

Parameters:
node -
Returns:
false if the message should not be dispatched to the client (another sub may have already dispatched it for example).
Throws:
IOException

isDropped

protected abstract boolean isDropped(MessageReference node)

acknowledge

protected abstract void acknowledge(ConnectionContext context,
                                    MessageAck ack,
                                    MessageReference node)
                             throws IOException
Used during acknowledgment to remove the message.

Throws:
IOException

getMaxProducersToAudit

public int getMaxProducersToAudit()

setMaxProducersToAudit

public void setMaxProducersToAudit(int maxProducersToAudit)

getMaxAuditDepth

public int getMaxAuditDepth()

setMaxAuditDepth

public void setMaxAuditDepth(int maxAuditDepth)

isUsePrefetchExtension

public boolean isUsePrefetchExtension()

setUsePrefetchExtension

public void setUsePrefetchExtension(boolean usePrefetchExtension)

getPrefetchExtension

protected int getPrefetchExtension()

setPrefetchSize

public void setPrefetchSize(int prefetchSize)
Overrides:
setPrefetchSize in class AbstractSubscription


Copyright © 2005-2012 The Apache Software Foundation. All Rights Reserved.