Class Queue
- java.lang.Object
-
- org.apache.activemq.broker.region.BaseDestination
-
- org.apache.activemq.broker.region.Queue
-
- All Implemented Interfaces:
Destination
,Message.MessageDestination
,Service
,IndexListener
,Task
,UsageListener
- Direct Known Subclasses:
TempQueue
public class Queue extends BaseDestination implements Task, UsageListener, IndexListener
The Queue is a List of MessageEntry objects that are dispatched to matching subscriptions.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.activemq.store.IndexListener
IndexListener.MessageContext
-
-
Field Summary
Fields Modifier and Type Field Description protected List<Subscription>
consumers
protected QueueDispatchPendingList
dispatchPendingList
protected static org.slf4j.Logger
LOG
protected PendingMessageCursor
messages
protected TaskRunnerFactory
taskFactory
protected TaskRunner
taskRunner
-
Fields inherited from class org.apache.activemq.broker.region.BaseDestination
blockedProducerWarningInterval, broker, brokerService, cursorMemoryHighWaterMark, deadLetterStrategy, DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC, destination, destinationStatistics, DUPLICATE_FROM_STORE_MSG_PREFIX, EXPIRE_MESSAGE_PERIOD, expireMessagesPeriod, lastBlockedProducerWarnTime, MAX_AUDIT_DEPTH, MAX_BROWSE_PAGE_SIZE, MAX_PAGE_SIZE, MAX_PRODUCERS_TO_AUDIT, memoryUsage, regionBroker, scheduler, started, store, storeUsageHighWaterMark, systemUsage
-
Fields inherited from interface org.apache.activemq.broker.region.Destination
DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL, DEFAULT_DEAD_LETTER_STRATEGY
-
-
Constructor Summary
Constructors Constructor Description Queue(BrokerService brokerService, ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
void
addSubscription(ConnectionContext context, Subscription sub)
protected void
assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId)
protected boolean
assignMessageGroup(Subscription subscription, QueueMessageReference node)
Message[]
browse()
void
clearPendingMessages(int pendingAdditionsCount)
int
copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages)
Copies the messages matching the given filter up to the maximum number of matched messagesint
copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
Copies the messages matching the given selectorint
copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages)
Copies the messages matching the given selector up to the maximum number of matched messagesboolean
copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
Copies the message matching the given messageIdprotected MessageReferenceFilter
createMessageIdFilter(String messageId)
protected MessageReferenceFilter
createSelectorFilter(String selector)
void
doBrowse(List<Message> browseList, int max)
protected void
doBrowseList(List<Message> browseList, int max, PendingList list, ReentrantReadWriteLock lock, ConnectionContext connectionContext, String name)
void
gc()
ActiveMQDestination
getActiveMQDestination()
List<Subscription>
getConsumers()
int
getConsumersBeforeDispatchStarts()
DispatchPolicy
getDispatchPolicy()
protected org.slf4j.Logger
getLog()
QueueMessageReference
getMessage(String id)
MessageGroupMapFactory
getMessageGroupMapFactory()
MessageGroupMap
getMessageGroupOwners()
PendingMessageCursor
getMessages()
long
getPendingMessageCount()
long
getPendingMessageSize()
int
getTimeBeforeDispatchStarts()
void
initialize()
initialize the destinationboolean
isAllConsumersExclusiveByDefault()
boolean
isDispatchPaused()
boolean
isOptimizedDispatch()
protected boolean
isOptimizeStorage()
boolean
isResetNeeded()
boolean
isStrictOrderDispatch()
boolean
isUseConsumerPriority()
boolean
iterate()
void
messageExpired(ConnectionContext context, MessageReference reference)
void
messageExpired(ConnectionContext context, Subscription subs, MessageReference reference)
Inform the Destination a message has expiredint
moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
Moves the messages matching the given selectorint
moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages)
Moves the messages matching the given selector up to the maximum number of matched messagesint
moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages)
Moves the messages matching the given filter up to the maximum number of matched messagesboolean
moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
Moves the message matching the given messageIdboolean
moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest)
Move a messagevoid
onAdd(IndexListener.MessageContext messageContext)
called with some global index lock held so that a listener can do order dependent work non null MessageContext.onCompletion called when work is donevoid
onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)
protected void
pageInMessages(boolean force, int maxPageSize)
void
pauseDispatch()
void
processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
called on Queues in slave mode to allow dispatch to follow subscription choice of mastervoid
purge()
int
removeMatchingMessages(String selector)
Removes the messages matching the given selectorint
removeMatchingMessages(String selector, int maximumMessages)
Removes the messages matching the given selector up to the maximum number of matched messagesint
removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages)
Removes the messages matching the given filter up to the maximum number of matched messagesboolean
removeMessage(String messageId)
Removes the message matching the given messageIdprotected void
removeMessage(ConnectionContext c, QueueMessageReference r)
protected void
removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r)
protected void
removeMessage(ConnectionContext context, Subscription sub, QueueMessageReference reference, MessageAck ack)
void
removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
void
resumeDispatch()
int
retryMessages(ConnectionContext context, int maximumMessages)
void
rollbackPendingCursorAdditions(MessageId messageId)
void
send(ProducerBrokerExchange producerExchange, Message message)
void
setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault)
void
setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts)
void
setDispatchPolicy(DispatchPolicy dispatchPolicy)
void
setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory)
void
setMessages(PendingMessageCursor messages)
void
setOptimizedDispatch(boolean optimizedDispatch)
void
setPrioritizedMessages(boolean prioritizedMessages)
void
setStrictOrderDispatch(boolean strictOrderDispatch)
void
setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts)
void
setUseConsumerPriority(boolean useConsumerPriority)
boolean
singlePendingSend()
void
start()
void
stop()
String
toString()
void
wakeup()
optionally called by a Subscriber - to inform the Destination its ready for more messages-
Methods inherited from class org.apache.activemq.broker.region.BaseDestination
addProducer, canGC, convertToNonRangedAck, createConnectionContext, dispose, duplicateFromStore, fastProducer, getBlockedProducerWarningInterval, getCursorMemoryHighWaterMark, getDeadLetterStrategy, getDestinationSequenceId, getDestinationStatistics, getExpireMessagesPeriod, getInactiveTimeoutBeforeGC, getMaxAuditDepth, getMaxBrowsePageSize, getMaxExpirePageSize, getMaxPageSize, getMaxProducersToAudit, getMemoryUsage, getMessageStore, getMinimumMessageSize, getName, getOptimizeMessageStoreInFlightLimit, getSlowConsumerStrategy, getStoreUsageHighWaterMark, getSystemUsage, getTempUsage, hasRegularConsumers, isActive, isAdvisoryForConsumed, isAdvisoryForDelivery, isAdvisoryForDiscardingMessages, isAdvisoryForFastProducers, isAdvisoryForSlowConsumers, isAdvisoryWhenFull, isAlwaysRetroactive, isDisposed, isDLQ, isDoOptimzeMessageStorage, isEnableAudit, isFlowControlLogRequired, isFull, isGcIfInactive, isGcWithNetworkConsumers, isIncludeBodyForAdvisory, isLazyDispatch, isPersistJMSRedelivered, isPrioritizedMessages, isProducerFlowControl, isReduceMemoryFootprint, isSendAdvisoryIfNoConsumers, isSendDuplicateFromStoreToDLQ, isUseCache, markForGC, messageConsumed, messageDelivered, messageDiscarded, onMessageWithNoConsumers, removeProducer, setAdvisoryForConsumed, setAdvisoryForDelivery, setAdvisoryForDiscardingMessages, setAdvisoryForFastProducers, setAdvisoryForSlowConsumers, setAdvisoryWhenFull, setAlwaysRetroactive, setBlockedProducerWarningInterval, setCursorMemoryHighWaterMark, setDeadLetterStrategy, setDoOptimzeMessageStorage, setEnableAudit, setExpireMessagesPeriod, setGcIfInactive, setGcWithNetworkConsumers, setInactiveTimeoutBeforeGC, setIncludeBodyForAdvisory, setLazyDispatch, setMaxAuditDepth, setMaxBrowsePageSize, setMaxExpirePageSize, setMaxPageSize, setMaxProducersToAudit, setMemoryUsage, setMinimumMessageSize, setOptimizeMessageStoreInFlightLimit, setPersistJMSRedelivered, setProducerFlowControl, setReduceMemoryFootprint, setSendAdvisoryIfNoConsumers, setSendDuplicateFromStoreToDLQ, setSlowConsumerStrategy, setStoreUsageHighWaterMark, setUseCache, slowConsumer, waitForSpace, waitForSpace
-
-
-
-
Field Detail
-
LOG
protected static final org.slf4j.Logger LOG
-
taskFactory
protected final TaskRunnerFactory taskFactory
-
taskRunner
protected TaskRunner taskRunner
-
consumers
protected final List<Subscription> consumers
-
messages
protected PendingMessageCursor messages
-
dispatchPendingList
protected QueueDispatchPendingList dispatchPendingList
-
-
Constructor Detail
-
Queue
public Queue(BrokerService brokerService, ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception
- Throws:
Exception
-
-
Method Detail
-
singlePendingSend
public boolean singlePendingSend()
-
getConsumers
public List<Subscription> getConsumers()
- Specified by:
getConsumers
in interfaceDestination
- Specified by:
getConsumers
in classBaseDestination
-
setPrioritizedMessages
public void setPrioritizedMessages(boolean prioritizedMessages)
- Overrides:
setPrioritizedMessages
in classBaseDestination
-
initialize
public void initialize() throws Exception
Description copied from class:BaseDestination
initialize the destination- Overrides:
initialize
in classBaseDestination
- Throws:
Exception
-
addSubscription
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
- Specified by:
addSubscription
in interfaceDestination
- Overrides:
addSubscription
in classBaseDestination
- Throws:
Exception
-
removeSubscription
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception
- Specified by:
removeSubscription
in interfaceDestination
- Overrides:
removeSubscription
in classBaseDestination
- Throws:
Exception
-
send
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception
- Specified by:
send
in interfaceDestination
- Throws:
Exception
-
onAdd
public void onAdd(IndexListener.MessageContext messageContext)
Description copied from interface:IndexListener
called with some global index lock held so that a listener can do order dependent work non null MessageContext.onCompletion called when work is done- Specified by:
onAdd
in interfaceIndexListener
-
rollbackPendingCursorAdditions
public void rollbackPendingCursorAdditions(MessageId messageId)
-
gc
public void gc()
- Specified by:
gc
in interfaceDestination
-
acknowledge
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException
- Specified by:
acknowledge
in interfaceDestination
- Throws:
IOException
-
getPendingMessageSize
public long getPendingMessageSize()
-
getPendingMessageCount
public long getPendingMessageCount()
-
start
public void start() throws Exception
-
getActiveMQDestination
public ActiveMQDestination getActiveMQDestination()
- Specified by:
getActiveMQDestination
in interfaceDestination
- Overrides:
getActiveMQDestination
in classBaseDestination
-
getMessageGroupOwners
public MessageGroupMap getMessageGroupOwners()
-
getDispatchPolicy
public DispatchPolicy getDispatchPolicy()
-
setDispatchPolicy
public void setDispatchPolicy(DispatchPolicy dispatchPolicy)
-
getMessageGroupMapFactory
public MessageGroupMapFactory getMessageGroupMapFactory()
-
setMessageGroupMapFactory
public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory)
-
getMessages
public PendingMessageCursor getMessages()
-
setMessages
public void setMessages(PendingMessageCursor messages)
-
isUseConsumerPriority
public boolean isUseConsumerPriority()
-
setUseConsumerPriority
public void setUseConsumerPriority(boolean useConsumerPriority)
-
isStrictOrderDispatch
public boolean isStrictOrderDispatch()
-
setStrictOrderDispatch
public void setStrictOrderDispatch(boolean strictOrderDispatch)
-
isOptimizedDispatch
public boolean isOptimizedDispatch()
-
setOptimizedDispatch
public void setOptimizedDispatch(boolean optimizedDispatch)
-
getTimeBeforeDispatchStarts
public int getTimeBeforeDispatchStarts()
-
setTimeBeforeDispatchStarts
public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts)
-
getConsumersBeforeDispatchStarts
public int getConsumersBeforeDispatchStarts()
-
setConsumersBeforeDispatchStarts
public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts)
-
setAllConsumersExclusiveByDefault
public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault)
-
isAllConsumersExclusiveByDefault
public boolean isAllConsumersExclusiveByDefault()
-
isResetNeeded
public boolean isResetNeeded()
-
browse
public Message[] browse()
- Specified by:
browse
in interfaceDestination
-
doBrowseList
protected void doBrowseList(List<Message> browseList, int max, PendingList list, ReentrantReadWriteLock lock, ConnectionContext connectionContext, String name) throws Exception
- Throws:
Exception
-
getMessage
public QueueMessageReference getMessage(String id)
-
clearPendingMessages
public void clearPendingMessages(int pendingAdditionsCount)
- Specified by:
clearPendingMessages
in interfaceDestination
-
removeMessage
public boolean removeMessage(String messageId) throws Exception
Removes the message matching the given messageId- Throws:
Exception
-
removeMatchingMessages
public int removeMatchingMessages(String selector) throws Exception
Removes the messages matching the given selector- Returns:
- the number of messages removed
- Throws:
Exception
-
removeMatchingMessages
public int removeMatchingMessages(String selector, int maximumMessages) throws Exception
Removes the messages matching the given selector up to the maximum number of matched messages- Returns:
- the number of messages removed
- Throws:
Exception
-
removeMatchingMessages
public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception
Removes the messages matching the given filter up to the maximum number of matched messages- Returns:
- the number of messages removed
- Throws:
Exception
-
copyMessageTo
public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception
Copies the message matching the given messageId- Throws:
Exception
-
copyMatchingMessagesTo
public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception
Copies the messages matching the given selector- Returns:
- the number of messages copied
- Throws:
Exception
-
copyMatchingMessagesTo
public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception
Copies the messages matching the given selector up to the maximum number of matched messages- Returns:
- the number of messages copied
- Throws:
Exception
-
copyMatchingMessages
public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception
Copies the messages matching the given filter up to the maximum number of matched messages- Returns:
- the number of messages copied
- Throws:
Exception
-
moveMessageTo
public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception
Move a message- Parameters:
context
- connection contextm
- QueueMessageReferencedest
- ActiveMQDestination- Throws:
Exception
-
moveMessageTo
public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception
Moves the message matching the given messageId- Throws:
Exception
-
moveMatchingMessagesTo
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception
Moves the messages matching the given selector- Returns:
- the number of messages removed
- Throws:
Exception
-
moveMatchingMessagesTo
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception
Moves the messages matching the given selector up to the maximum number of matched messages- Throws:
Exception
-
moveMatchingMessagesTo
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception
Moves the messages matching the given filter up to the maximum number of matched messages- Throws:
Exception
-
retryMessages
public int retryMessages(ConnectionContext context, int maximumMessages) throws Exception
- Throws:
Exception
-
iterate
public boolean iterate()
- Specified by:
iterate
in interfaceTask
- Returns:
- true if we would like to iterate again
- See Also:
Task.iterate()
-
pauseDispatch
public void pauseDispatch()
-
resumeDispatch
public void resumeDispatch()
-
isDispatchPaused
public boolean isDispatchPaused()
-
createMessageIdFilter
protected MessageReferenceFilter createMessageIdFilter(String messageId)
-
createSelectorFilter
protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException
- Throws:
InvalidSelectorException
-
removeMessage
protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException
- Throws:
IOException
-
removeMessage
protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException
- Throws:
IOException
-
removeMessage
protected void removeMessage(ConnectionContext context, Subscription sub, QueueMessageReference reference, MessageAck ack) throws IOException
- Throws:
IOException
-
messageExpired
public void messageExpired(ConnectionContext context, MessageReference reference)
-
messageExpired
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference)
Description copied from interface:Destination
Inform the Destination a message has expired- Specified by:
messageExpired
in interfaceDestination
-
wakeup
public void wakeup()
Description copied from interface:Destination
optionally called by a Subscriber - to inform the Destination its ready for more messages- Specified by:
wakeup
in interfaceDestination
-
assignMessageGroup
protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception
- Throws:
Exception
-
assignGroup
protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException
- Throws:
IOException
-
pageInMessages
protected void pageInMessages(boolean force, int maxPageSize) throws Exception
- Throws:
Exception
-
processDispatchNotification
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception
Description copied from interface:Destination
called on Queues in slave mode to allow dispatch to follow subscription choice of master- Specified by:
processDispatchNotification
in interfaceDestination
- Overrides:
processDispatchNotification
in classBaseDestination
- Throws:
Exception
-
onUsageChanged
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)
- Specified by:
onUsageChanged
in interfaceUsageListener
-
getLog
protected org.slf4j.Logger getLog()
- Specified by:
getLog
in classBaseDestination
-
isOptimizeStorage
protected boolean isOptimizeStorage()
-
-