org.apache.activemq.broker.region
Class BaseDestination

java.lang.Object
  extended by org.apache.activemq.broker.region.BaseDestination
All Implemented Interfaces:
Destination, Service, Task
Direct Known Subclasses:
Queue, Topic

public abstract class BaseDestination
extends Object
implements Destination


Field Summary
protected  long blockedProducerWarningInterval
           
protected  Broker broker
           
protected  BrokerService brokerService
           
protected  int cursorMemoryHighWaterMark
           
protected  DeadLetterStrategy deadLetterStrategy
           
static long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC
           
protected  ActiveMQDestination destination
           
protected  DestinationStatistics destinationStatistics
           
static long EXPIRE_MESSAGE_PERIOD
           
protected  long expireMessagesPeriod
           
static int MAX_AUDIT_DEPTH
           
static int MAX_BROWSE_PAGE_SIZE
           
static int MAX_PAGE_SIZE
          The maximum number of messages to page in to the destination from persistent storage
static int MAX_PRODUCERS_TO_AUDIT
           
protected  MemoryUsage memoryUsage
           
protected  Broker regionBroker
           
protected  Scheduler scheduler
           
protected  MessageStore store
           
protected  int storeUsageHighWaterMark
           
protected  SystemUsage systemUsage
           
protected  boolean warnOnProducerFlowControl
           
 
Fields inherited from interface org.apache.activemq.broker.region.Destination
DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL, DEFAULT_DEAD_LETTER_STRATEGY
 
Constructor Summary
BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats)
           
 
Method Summary
 void addProducer(ConnectionContext context, ProducerInfo info)
           
 void addSubscription(ConnectionContext context, Subscription sub)
           
 boolean canGC()
           
protected  MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node)
           
protected  ConnectionContext createConnectionContext()
           
 void dispose(ConnectionContext context)
           
 void fastProducer(ConnectionContext context, ProducerInfo producerInfo)
          Called to notify a producer is too fast
 ActiveMQDestination getActiveMQDestination()
           
 long getBlockedProducerWarningInterval()
           
abstract  List<Subscription> getConsumers()
           
 int getCursorMemoryHighWaterMark()
           
 DeadLetterStrategy getDeadLetterStrategy()
           
protected  long getDestinationSequenceId()
           
 DestinationStatistics getDestinationStatistics()
           
 long getExpireMessagesPeriod()
           
 long getInactiveTimoutBeforeGC()
           
protected abstract  Logger getLog()
           
 int getMaxAuditDepth()
           
 int getMaxBrowsePageSize()
           
 int getMaxExpirePageSize()
           
 int getMaxPageSize()
           
 int getMaxProducersToAudit()
           
 MemoryUsage getMemoryUsage()
           
 MessageStore getMessageStore()
           
 int getMinimumMessageSize()
           
 String getName()
           
 int getOptimizeMessageStoreInFlightLimit()
           
 SlowConsumerStrategy getSlowConsumerStrategy()
           
 int getStoreUsageHighWaterMark()
           
protected  boolean hasRegularConsumers(List<Subscription> consumers)
           
 void initialize()
          initialize the destination
 boolean isActive()
           
 boolean isAdvisoryForConsumed()
           
 boolean isAdvisoryForDelivery()
           
 boolean isAdvisoryForDiscardingMessages()
           
 boolean isAdvisoryForFastProducers()
           
 boolean isAdvisoryForSlowConsumers()
           
 boolean isAdvisoryWhenFull()
           
 boolean isAlwaysRetroactive()
           
 boolean isDisposed()
           
 boolean isDoOptimzeMessageStorage()
           
 boolean isEnableAudit()
           
 void isFull(ConnectionContext context, Usage<?> usage)
          Called when a Usage reaches a limit
 boolean isGcIfInactive()
           
 boolean isGcWithNetworkConsumers()
           
 boolean isLazyDispatch()
           
 boolean isPrioritizedMessages()
           
 boolean isProducerFlowControl()
           
protected  boolean isReduceMemoryFootprint()
           
 boolean isSendAdvisoryIfNoConsumers()
           
 boolean isUseCache()
           
 void markForGC(long timeStamp)
           
 void messageConsumed(ConnectionContext context, MessageReference messageReference)
          called when message is consumed
 void messageDelivered(ConnectionContext context, MessageReference messageReference)
          Called when message is delivered to the broker
 void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference)
          Called when a message is discarded - e.g.
protected  void onMessageWithNoConsumers(ConnectionContext context, Message msg)
          Provides a hook to allow messages with no consumer to be processed in some way - such as to send to a dead letter queue or something..
 void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
          called on Queues in slave mode to allow dispatch to follow subscription choice of master
 void removeProducer(ConnectionContext context, ProducerInfo info)
           
 void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
           
 void setAdvisoryForConsumed(boolean advisoryForConsumed)
           
 void setAdvisoryForDelivery(boolean advisoryForDelivery)
           
 void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages)
           
 void setAdvisoryForFastProducers(boolean advisoryForFastProducers)
           
 void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers)
           
 void setAdvisoryWhenFull(boolean advisoryWhenFull)
           
 void setAlwaysRetroactive(boolean alwaysRetroactive)
           
 void setBlockedProducerWarningInterval(long blockedProducerWarningInterval)
          Set's the interval at which warnings about producers being blocked by resource usage will be triggered.
 void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark)
           
 void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy)
          set the dead letter strategy
 void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage)
           
 void setEnableAudit(boolean enableAudit)
           
 void setExpireMessagesPeriod(long expireMessagesPeriod)
           
 void setGcIfInactive(boolean gcIfInactive)
           
 void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers)
          Indicate if it is ok to gc destinations that have only network consumers
 void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC)
           
 void setLazyDispatch(boolean lazyDispatch)
          set the lazy dispatch - default is false
 void setMaxAuditDepth(int maxAuditDepth)
           
 void setMaxBrowsePageSize(int maxPageSize)
           
 void setMaxExpirePageSize(int maxPageSize)
           
 void setMaxPageSize(int maxPageSize)
           
 void setMaxProducersToAudit(int maxProducersToAudit)
           
 void setMinimumMessageSize(int minimumMessageSize)
           
 void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit)
           
 void setPrioritizedMessages(boolean prioritizedMessages)
           
 void setProducerFlowControl(boolean producerFlowControl)
           
 void setReduceMemoryFootprint(boolean reduceMemoryFootprint)
           
 void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers)
           
 void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy)
           
 void setStoreUsageHighWaterMark(int storeUsageHighWaterMark)
           
 void setUseCache(boolean useCache)
           
 void slowConsumer(ConnectionContext context, Subscription subs)
          Called when there is a slow consumer
protected  void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning)
           
protected  void waitForSpace(ConnectionContext context, Usage<?> usage, String warning)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface org.apache.activemq.broker.region.Destination
acknowledge, browse, clearPendingMessages, gc, messageExpired, send, wakeup
 
Methods inherited from interface org.apache.activemq.Service
start, stop
 
Methods inherited from interface org.apache.activemq.thread.Task
iterate
 

Field Detail

MAX_PAGE_SIZE

public static final int MAX_PAGE_SIZE
The maximum number of messages to page in to the destination from persistent storage

See Also:
Constant Field Values

MAX_BROWSE_PAGE_SIZE

public static final int MAX_BROWSE_PAGE_SIZE
See Also:
Constant Field Values

EXPIRE_MESSAGE_PERIOD

public static final long EXPIRE_MESSAGE_PERIOD
See Also:
Constant Field Values

DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC

public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC
See Also:
Constant Field Values

MAX_PRODUCERS_TO_AUDIT

public static final int MAX_PRODUCERS_TO_AUDIT
See Also:
Constant Field Values

MAX_AUDIT_DEPTH

public static final int MAX_AUDIT_DEPTH
See Also:
Constant Field Values

destination

protected final ActiveMQDestination destination

broker

protected final Broker broker

store

protected final MessageStore store

systemUsage

protected SystemUsage systemUsage

memoryUsage

protected MemoryUsage memoryUsage

warnOnProducerFlowControl

protected boolean warnOnProducerFlowControl

blockedProducerWarningInterval

protected long blockedProducerWarningInterval

destinationStatistics

protected final DestinationStatistics destinationStatistics

brokerService

protected final BrokerService brokerService

regionBroker

protected final Broker regionBroker

deadLetterStrategy

protected DeadLetterStrategy deadLetterStrategy

expireMessagesPeriod

protected long expireMessagesPeriod

cursorMemoryHighWaterMark

protected int cursorMemoryHighWaterMark

storeUsageHighWaterMark

protected int storeUsageHighWaterMark

scheduler

protected final Scheduler scheduler
Constructor Detail

BaseDestination

public BaseDestination(BrokerService brokerService,
                       MessageStore store,
                       ActiveMQDestination destination,
                       DestinationStatistics parentStats)
                throws Exception
Parameters:
brokerService -
store -
destination -
parentStats -
Throws:
Exception
Method Detail

initialize

public void initialize()
                throws Exception
initialize the destination

Throws:
Exception

isProducerFlowControl

public boolean isProducerFlowControl()
Specified by:
isProducerFlowControl in interface Destination
Returns:
the producerFlowControl

setProducerFlowControl

public void setProducerFlowControl(boolean producerFlowControl)
Specified by:
setProducerFlowControl in interface Destination
Parameters:
producerFlowControl - the producerFlowControl to set

isAlwaysRetroactive

public boolean isAlwaysRetroactive()
Specified by:
isAlwaysRetroactive in interface Destination

setAlwaysRetroactive

public void setAlwaysRetroactive(boolean alwaysRetroactive)
Specified by:
setAlwaysRetroactive in interface Destination

setBlockedProducerWarningInterval

public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval)
Set's the interval at which warnings about producers being blocked by resource usage will be triggered. Values of 0 or less will disable warnings

Specified by:
setBlockedProducerWarningInterval in interface Destination
Parameters:
blockedProducerWarningInterval - the interval at which warning about blocked producers will be triggered.

getBlockedProducerWarningInterval

public long getBlockedProducerWarningInterval()
Specified by:
getBlockedProducerWarningInterval in interface Destination
Returns:
the interval at which warning about blocked producers will be triggered.

getMaxProducersToAudit

public int getMaxProducersToAudit()
Specified by:
getMaxProducersToAudit in interface Destination
Returns:
the maxProducersToAudit

setMaxProducersToAudit

public void setMaxProducersToAudit(int maxProducersToAudit)
Specified by:
setMaxProducersToAudit in interface Destination
Parameters:
maxProducersToAudit - the maxProducersToAudit to set

getMaxAuditDepth

public int getMaxAuditDepth()
Specified by:
getMaxAuditDepth in interface Destination
Returns:
the maxAuditDepth

setMaxAuditDepth

public void setMaxAuditDepth(int maxAuditDepth)
Specified by:
setMaxAuditDepth in interface Destination
Parameters:
maxAuditDepth - the maxAuditDepth to set

isEnableAudit

public boolean isEnableAudit()
Specified by:
isEnableAudit in interface Destination
Returns:
the enableAudit

setEnableAudit

public void setEnableAudit(boolean enableAudit)
Specified by:
setEnableAudit in interface Destination
Parameters:
enableAudit - the enableAudit to set

addProducer

public void addProducer(ConnectionContext context,
                        ProducerInfo info)
                 throws Exception
Specified by:
addProducer in interface Destination
Throws:
Exception

removeProducer

public void removeProducer(ConnectionContext context,
                           ProducerInfo info)
                    throws Exception
Specified by:
removeProducer in interface Destination
Throws:
Exception

addSubscription

public void addSubscription(ConnectionContext context,
                            Subscription sub)
                     throws Exception
Specified by:
addSubscription in interface Destination
Throws:
Exception

removeSubscription

public void removeSubscription(ConnectionContext context,
                               Subscription sub,
                               long lastDeliveredSequenceId)
                        throws Exception
Specified by:
removeSubscription in interface Destination
Throws:
Exception

getMemoryUsage

public final MemoryUsage getMemoryUsage()
Specified by:
getMemoryUsage in interface Destination

getDestinationStatistics

public DestinationStatistics getDestinationStatistics()
Specified by:
getDestinationStatistics in interface Destination

getActiveMQDestination

public ActiveMQDestination getActiveMQDestination()
Specified by:
getActiveMQDestination in interface Destination

getName

public final String getName()
Specified by:
getName in interface Destination

getMessageStore

public final MessageStore getMessageStore()
Specified by:
getMessageStore in interface Destination

isActive

public boolean isActive()
Specified by:
isActive in interface Destination

getMaxPageSize

public int getMaxPageSize()
Specified by:
getMaxPageSize in interface Destination

setMaxPageSize

public void setMaxPageSize(int maxPageSize)
Specified by:
setMaxPageSize in interface Destination

getMaxBrowsePageSize

public int getMaxBrowsePageSize()
Specified by:
getMaxBrowsePageSize in interface Destination

setMaxBrowsePageSize

public void setMaxBrowsePageSize(int maxPageSize)
Specified by:
setMaxBrowsePageSize in interface Destination

getMaxExpirePageSize

public int getMaxExpirePageSize()

setMaxExpirePageSize

public void setMaxExpirePageSize(int maxPageSize)

setExpireMessagesPeriod

public void setExpireMessagesPeriod(long expireMessagesPeriod)

getExpireMessagesPeriod

public long getExpireMessagesPeriod()

isUseCache

public boolean isUseCache()
Specified by:
isUseCache in interface Destination

setUseCache

public void setUseCache(boolean useCache)
Specified by:
setUseCache in interface Destination

getMinimumMessageSize

public int getMinimumMessageSize()
Specified by:
getMinimumMessageSize in interface Destination

setMinimumMessageSize

public void setMinimumMessageSize(int minimumMessageSize)
Specified by:
setMinimumMessageSize in interface Destination

isLazyDispatch

public boolean isLazyDispatch()
Specified by:
isLazyDispatch in interface Destination
Returns:
true if lazyDispatch is enabled

setLazyDispatch

public void setLazyDispatch(boolean lazyDispatch)
Description copied from interface: Destination
set the lazy dispatch - default is false

Specified by:
setLazyDispatch in interface Destination

getDestinationSequenceId

protected long getDestinationSequenceId()

isAdvisoryForSlowConsumers

public boolean isAdvisoryForSlowConsumers()
Returns:
the advisoryForSlowConsumers

setAdvisoryForSlowConsumers

public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers)
Parameters:
advisoryForSlowConsumers - the advisoryForSlowConsumers to set

isAdvisoryForDiscardingMessages

public boolean isAdvisoryForDiscardingMessages()
Returns:
the advisoryForDiscardingMessages

setAdvisoryForDiscardingMessages

public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages)
Parameters:
advisoryForDiscardingMessages - the advisoryForDiscardingMessages to set

isAdvisoryWhenFull

public boolean isAdvisoryWhenFull()
Returns:
the advisoryWhenFull

setAdvisoryWhenFull

public void setAdvisoryWhenFull(boolean advisoryWhenFull)
Parameters:
advisoryWhenFull - the advisoryWhenFull to set

isAdvisoryForDelivery

public boolean isAdvisoryForDelivery()
Returns:
the advisoryForDelivery

setAdvisoryForDelivery

public void setAdvisoryForDelivery(boolean advisoryForDelivery)
Parameters:
advisoryForDelivery - the advisoryForDelivery to set

isAdvisoryForConsumed

public boolean isAdvisoryForConsumed()
Returns:
the advisoryForConsumed

setAdvisoryForConsumed

public void setAdvisoryForConsumed(boolean advisoryForConsumed)
Parameters:
advisoryForConsumed - the advisoryForConsumed to set

isAdvisoryForFastProducers

public boolean isAdvisoryForFastProducers()
Returns:
the advisdoryForFastProducers

setAdvisoryForFastProducers

public void setAdvisoryForFastProducers(boolean advisoryForFastProducers)
Parameters:
advisoryForFastProducers - the advisdoryForFastProducers to set

isSendAdvisoryIfNoConsumers

public boolean isSendAdvisoryIfNoConsumers()

setSendAdvisoryIfNoConsumers

public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers)

getDeadLetterStrategy

public DeadLetterStrategy getDeadLetterStrategy()
Specified by:
getDeadLetterStrategy in interface Destination
Returns:
the dead letter strategy

setDeadLetterStrategy

public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy)
set the dead letter strategy

Parameters:
deadLetterStrategy -

getCursorMemoryHighWaterMark

public int getCursorMemoryHighWaterMark()
Specified by:
getCursorMemoryHighWaterMark in interface Destination

setCursorMemoryHighWaterMark

public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark)
Specified by:
setCursorMemoryHighWaterMark in interface Destination

messageConsumed

public void messageConsumed(ConnectionContext context,
                            MessageReference messageReference)
called when message is consumed

Specified by:
messageConsumed in interface Destination
Parameters:
context -
messageReference -

messageDelivered

public void messageDelivered(ConnectionContext context,
                             MessageReference messageReference)
Called when message is delivered to the broker

Specified by:
messageDelivered in interface Destination
Parameters:
context -
messageReference -

messageDiscarded

public void messageDiscarded(ConnectionContext context,
                             Subscription sub,
                             MessageReference messageReference)
Called when a message is discarded - e.g. running low on memory This will happen only if the policy is enabled - e.g. non durable topics

Specified by:
messageDiscarded in interface Destination
Parameters:
context -
messageReference -

slowConsumer

public void slowConsumer(ConnectionContext context,
                         Subscription subs)
Called when there is a slow consumer

Specified by:
slowConsumer in interface Destination
Parameters:
context -
subs -

fastProducer

public void fastProducer(ConnectionContext context,
                         ProducerInfo producerInfo)
Called to notify a producer is too fast

Specified by:
fastProducer in interface Destination
Parameters:
context -
producerInfo -

isFull

public void isFull(ConnectionContext context,
                   Usage<?> usage)
Called when a Usage reaches a limit

Specified by:
isFull in interface Destination
Parameters:
context -
usage -

dispose

public void dispose(ConnectionContext context)
             throws IOException
Specified by:
dispose in interface Destination
Throws:
IOException

isDisposed

public boolean isDisposed()
Specified by:
isDisposed in interface Destination

onMessageWithNoConsumers

protected void onMessageWithNoConsumers(ConnectionContext context,
                                        Message msg)
                                 throws Exception
Provides a hook to allow messages with no consumer to be processed in some way - such as to send to a dead letter queue or something..

Throws:
Exception

processDispatchNotification

public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
                                 throws Exception
Description copied from interface: Destination
called on Queues in slave mode to allow dispatch to follow subscription choice of master

Specified by:
processDispatchNotification in interface Destination
Throws:
Exception

getStoreUsageHighWaterMark

public final int getStoreUsageHighWaterMark()

setStoreUsageHighWaterMark

public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark)

waitForSpace

protected final void waitForSpace(ConnectionContext context,
                                  Usage<?> usage,
                                  String warning)
                           throws IOException,
                                  InterruptedException,
                                  ResourceAllocationException
Throws:
IOException
InterruptedException
ResourceAllocationException

waitForSpace

protected final void waitForSpace(ConnectionContext context,
                                  Usage<?> usage,
                                  int highWaterMark,
                                  String warning)
                           throws IOException,
                                  InterruptedException,
                                  ResourceAllocationException
Throws:
IOException
InterruptedException
ResourceAllocationException

getLog

protected abstract Logger getLog()

setSlowConsumerStrategy

public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy)

getSlowConsumerStrategy

public SlowConsumerStrategy getSlowConsumerStrategy()
Specified by:
getSlowConsumerStrategy in interface Destination

isPrioritizedMessages

public boolean isPrioritizedMessages()
Specified by:
isPrioritizedMessages in interface Destination

setPrioritizedMessages

public void setPrioritizedMessages(boolean prioritizedMessages)

getInactiveTimoutBeforeGC

public long getInactiveTimoutBeforeGC()
Specified by:
getInactiveTimoutBeforeGC in interface Destination
Returns:
the inactiveTimoutBeforeGC

setInactiveTimoutBeforeGC

public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC)
Parameters:
inactiveTimoutBeforeGC - the inactiveTimoutBeforeGC to set

isGcIfInactive

public boolean isGcIfInactive()
Returns:
the gcIfInactive

setGcIfInactive

public void setGcIfInactive(boolean gcIfInactive)
Parameters:
gcIfInactive - the gcIfInactive to set

setGcWithNetworkConsumers

public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers)
Indicate if it is ok to gc destinations that have only network consumers

Parameters:
gcWithNetworkConsumers -

isGcWithNetworkConsumers

public boolean isGcWithNetworkConsumers()

markForGC

public void markForGC(long timeStamp)
Specified by:
markForGC in interface Destination

canGC

public boolean canGC()
Specified by:
canGC in interface Destination

setReduceMemoryFootprint

public void setReduceMemoryFootprint(boolean reduceMemoryFootprint)

isReduceMemoryFootprint

protected boolean isReduceMemoryFootprint()

isDoOptimzeMessageStorage

public boolean isDoOptimzeMessageStorage()
Specified by:
isDoOptimzeMessageStorage in interface Destination

setDoOptimzeMessageStorage

public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage)
Specified by:
setDoOptimzeMessageStorage in interface Destination

getOptimizeMessageStoreInFlightLimit

public int getOptimizeMessageStoreInFlightLimit()

setOptimizeMessageStoreInFlightLimit

public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit)

getConsumers

public abstract List<Subscription> getConsumers()
Specified by:
getConsumers in interface Destination

hasRegularConsumers

protected boolean hasRegularConsumers(List<Subscription> consumers)

createConnectionContext

protected ConnectionContext createConnectionContext()

convertToNonRangedAck

protected MessageAck convertToNonRangedAck(MessageAck ack,
                                           MessageReference node)


Copyright © 2005-2012 The Apache Software Foundation. All Rights Reserved.