Class Queue
- java.lang.Object
-
- org.apache.activemq.broker.region.BaseDestination
-
- org.apache.activemq.broker.region.Queue
-
- All Implemented Interfaces:
Destination,Message.MessageDestination,Service,IndexListener,Task,UsageListener
- Direct Known Subclasses:
TempQueue
public class Queue extends BaseDestination implements Task, UsageListener, IndexListener
The Queue is a List of MessageEntry objects that are dispatched to matching subscriptions.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.activemq.store.IndexListener
IndexListener.MessageContext
-
-
Field Summary
Fields Modifier and Type Field Description protected List<Subscription>consumersprotected QueueDispatchPendingListdispatchPendingListprotected static org.slf4j.LoggerLOGprotected PendingMessageCursormessagesprotected TaskRunnerFactorytaskFactoryprotected TaskRunnertaskRunner-
Fields inherited from class org.apache.activemq.broker.region.BaseDestination
blockedProducerWarningInterval, broker, brokerService, cursorMemoryHighWaterMark, deadLetterStrategy, DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC, destination, destinationStatistics, DUPLICATE_FROM_STORE_MSG_PREFIX, EXPIRE_MESSAGE_PERIOD, expireMessagesPeriod, lastBlockedProducerWarnTime, MAX_AUDIT_DEPTH, MAX_BROWSE_PAGE_SIZE, MAX_PAGE_SIZE, MAX_PRODUCERS_TO_AUDIT, memoryUsage, regionBroker, scheduler, started, store, storeUsageHighWaterMark, systemUsage
-
Fields inherited from interface org.apache.activemq.broker.region.Destination
DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL, DEFAULT_DEAD_LETTER_STRATEGY
-
-
Constructor Summary
Constructors Constructor Description Queue(BrokerService brokerService, ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidacknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)voidaddSubscription(ConnectionContext context, Subscription sub)protected voidassignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId)protected booleanassignMessageGroup(Subscription subscription, QueueMessageReference node)Message[]browse()voidclearPendingMessages(int pendingAdditionsCount)intcopyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages)Copies the messages matching the given filter up to the maximum number of matched messagesintcopyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)Copies the messages matching the given selectorintcopyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages)Copies the messages matching the given selector up to the maximum number of matched messagesbooleancopyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)Copies the message matching the given messageIdprotected MessageReferenceFiltercreateMessageIdFilter(String messageId)protected MessageReferenceFiltercreateSelectorFilter(String selector)voiddoBrowse(List<Message> browseList, int max)protected voiddoBrowseList(List<Message> browseList, int max, PendingList list, ReentrantReadWriteLock lock, ConnectionContext connectionContext, String name)voidgc()ActiveMQDestinationgetActiveMQDestination()List<Subscription>getConsumers()intgetConsumersBeforeDispatchStarts()DispatchPolicygetDispatchPolicy()protected org.slf4j.LoggergetLog()QueueMessageReferencegetMessage(String id)MessageGroupMapFactorygetMessageGroupMapFactory()MessageGroupMapgetMessageGroupOwners()PendingMessageCursorgetMessages()longgetPendingMessageCount()longgetPendingMessageSize()intgetTimeBeforeDispatchStarts()voidinitialize()initialize the destinationbooleanisAllConsumersExclusiveByDefault()booleanisDispatchPaused()booleanisOptimizedDispatch()protected booleanisOptimizeStorage()booleanisResetNeeded()booleanisStrictOrderDispatch()booleanisUseConsumerPriority()booleaniterate()voidmessageExpired(ConnectionContext context, MessageReference reference)voidmessageExpired(ConnectionContext context, Subscription subs, MessageReference reference)Inform the Destination a message has expiredintmoveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)Moves the messages matching the given selectorintmoveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages)Moves the messages matching the given selector up to the maximum number of matched messagesintmoveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages)Moves the messages matching the given filter up to the maximum number of matched messagesbooleanmoveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)Moves the message matching the given messageIdbooleanmoveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest)Move a messagevoidonAdd(IndexListener.MessageContext messageContext)called with some global index lock held so that a listener can do order dependent work non null MessageContext.onCompletion called when work is donevoidonUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)protected voidpageInMessages(boolean force, int maxPageSize)voidpauseDispatch()voidprocessDispatchNotification(MessageDispatchNotification messageDispatchNotification)called on Queues in slave mode to allow dispatch to follow subscription choice of mastervoidpurge()intremoveMatchingMessages(String selector)Removes the messages matching the given selectorintremoveMatchingMessages(String selector, int maximumMessages)Removes the messages matching the given selector up to the maximum number of matched messagesintremoveMatchingMessages(MessageReferenceFilter filter, int maximumMessages)Removes the messages matching the given filter up to the maximum number of matched messagesbooleanremoveMessage(String messageId)Removes the message matching the given messageIdprotected voidremoveMessage(ConnectionContext c, QueueMessageReference r)protected voidremoveMessage(ConnectionContext c, Subscription subs, QueueMessageReference r)protected voidremoveMessage(ConnectionContext context, Subscription sub, QueueMessageReference reference, MessageAck ack)voidremoveSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)voidresumeDispatch()intretryMessages(ConnectionContext context, int maximumMessages)voidrollbackPendingCursorAdditions(MessageId messageId)voidsend(ProducerBrokerExchange producerExchange, Message message)voidsetAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault)voidsetConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts)voidsetDispatchPolicy(DispatchPolicy dispatchPolicy)voidsetMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory)voidsetMessages(PendingMessageCursor messages)voidsetOptimizedDispatch(boolean optimizedDispatch)voidsetPrioritizedMessages(boolean prioritizedMessages)voidsetStrictOrderDispatch(boolean strictOrderDispatch)voidsetTimeBeforeDispatchStarts(int timeBeforeDispatchStarts)voidsetUseConsumerPriority(boolean useConsumerPriority)booleansinglePendingSend()voidstart()voidstop()StringtoString()voidwakeup()optionally called by a Subscriber - to inform the Destination its ready for more messages-
Methods inherited from class org.apache.activemq.broker.region.BaseDestination
addProducer, canGC, convertToNonRangedAck, createConnectionContext, dispose, duplicateFromStore, fastProducer, getBlockedProducerWarningInterval, getCursorMemoryHighWaterMark, getDeadLetterStrategy, getDestinationSequenceId, getDestinationStatistics, getExpireMessagesPeriod, getInactiveTimeoutBeforeGC, getMaxAuditDepth, getMaxBrowsePageSize, getMaxExpirePageSize, getMaxPageSize, getMaxProducersToAudit, getMemoryUsage, getMessageStore, getMinimumMessageSize, getName, getOptimizeMessageStoreInFlightLimit, getSlowConsumerStrategy, getStoreUsageHighWaterMark, getSystemUsage, getTempUsage, hasRegularConsumers, isActive, isAdvisoryForConsumed, isAdvisoryForDelivery, isAdvisoryForDiscardingMessages, isAdvisoryForFastProducers, isAdvisoryForSlowConsumers, isAdvisoryWhenFull, isAlwaysRetroactive, isDisposed, isDLQ, isDoOptimzeMessageStorage, isEnableAudit, isFlowControlLogRequired, isFull, isGcIfInactive, isGcWithNetworkConsumers, isIncludeBodyForAdvisory, isLazyDispatch, isPersistJMSRedelivered, isPrioritizedMessages, isProducerFlowControl, isReduceMemoryFootprint, isSendAdvisoryIfNoConsumers, isSendDuplicateFromStoreToDLQ, isUseCache, markForGC, messageConsumed, messageDelivered, messageDiscarded, onMessageWithNoConsumers, removeProducer, setAdvisoryForConsumed, setAdvisoryForDelivery, setAdvisoryForDiscardingMessages, setAdvisoryForFastProducers, setAdvisoryForSlowConsumers, setAdvisoryWhenFull, setAlwaysRetroactive, setBlockedProducerWarningInterval, setCursorMemoryHighWaterMark, setDeadLetterStrategy, setDoOptimzeMessageStorage, setEnableAudit, setExpireMessagesPeriod, setGcIfInactive, setGcWithNetworkConsumers, setInactiveTimeoutBeforeGC, setIncludeBodyForAdvisory, setLazyDispatch, setMaxAuditDepth, setMaxBrowsePageSize, setMaxExpirePageSize, setMaxPageSize, setMaxProducersToAudit, setMemoryUsage, setMinimumMessageSize, setOptimizeMessageStoreInFlightLimit, setPersistJMSRedelivered, setProducerFlowControl, setReduceMemoryFootprint, setSendAdvisoryIfNoConsumers, setSendDuplicateFromStoreToDLQ, setSlowConsumerStrategy, setStoreUsageHighWaterMark, setUseCache, slowConsumer, waitForSpace, waitForSpace
-
-
-
-
Field Detail
-
LOG
protected static final org.slf4j.Logger LOG
-
taskFactory
protected final TaskRunnerFactory taskFactory
-
taskRunner
protected TaskRunner taskRunner
-
consumers
protected final List<Subscription> consumers
-
messages
protected PendingMessageCursor messages
-
dispatchPendingList
protected QueueDispatchPendingList dispatchPendingList
-
-
Constructor Detail
-
Queue
public Queue(BrokerService brokerService, ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception
- Throws:
Exception
-
-
Method Detail
-
singlePendingSend
public boolean singlePendingSend()
-
getConsumers
public List<Subscription> getConsumers()
- Specified by:
getConsumersin interfaceDestination- Specified by:
getConsumersin classBaseDestination
-
setPrioritizedMessages
public void setPrioritizedMessages(boolean prioritizedMessages)
- Overrides:
setPrioritizedMessagesin classBaseDestination
-
initialize
public void initialize() throws ExceptionDescription copied from class:BaseDestinationinitialize the destination- Overrides:
initializein classBaseDestination- Throws:
Exception
-
addSubscription
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
- Specified by:
addSubscriptionin interfaceDestination- Overrides:
addSubscriptionin classBaseDestination- Throws:
Exception
-
removeSubscription
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception
- Specified by:
removeSubscriptionin interfaceDestination- Overrides:
removeSubscriptionin classBaseDestination- Throws:
Exception
-
send
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception
- Specified by:
sendin interfaceDestination- Throws:
Exception
-
onAdd
public void onAdd(IndexListener.MessageContext messageContext)
Description copied from interface:IndexListenercalled with some global index lock held so that a listener can do order dependent work non null MessageContext.onCompletion called when work is done- Specified by:
onAddin interfaceIndexListener
-
rollbackPendingCursorAdditions
public void rollbackPendingCursorAdditions(MessageId messageId)
-
gc
public void gc()
- Specified by:
gcin interfaceDestination
-
acknowledge
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException
- Specified by:
acknowledgein interfaceDestination- Throws:
IOException
-
getPendingMessageSize
public long getPendingMessageSize()
-
getPendingMessageCount
public long getPendingMessageCount()
-
start
public void start() throws Exception
-
getActiveMQDestination
public ActiveMQDestination getActiveMQDestination()
- Specified by:
getActiveMQDestinationin interfaceDestination- Overrides:
getActiveMQDestinationin classBaseDestination
-
getMessageGroupOwners
public MessageGroupMap getMessageGroupOwners()
-
getDispatchPolicy
public DispatchPolicy getDispatchPolicy()
-
setDispatchPolicy
public void setDispatchPolicy(DispatchPolicy dispatchPolicy)
-
getMessageGroupMapFactory
public MessageGroupMapFactory getMessageGroupMapFactory()
-
setMessageGroupMapFactory
public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory)
-
getMessages
public PendingMessageCursor getMessages()
-
setMessages
public void setMessages(PendingMessageCursor messages)
-
isUseConsumerPriority
public boolean isUseConsumerPriority()
-
setUseConsumerPriority
public void setUseConsumerPriority(boolean useConsumerPriority)
-
isStrictOrderDispatch
public boolean isStrictOrderDispatch()
-
setStrictOrderDispatch
public void setStrictOrderDispatch(boolean strictOrderDispatch)
-
isOptimizedDispatch
public boolean isOptimizedDispatch()
-
setOptimizedDispatch
public void setOptimizedDispatch(boolean optimizedDispatch)
-
getTimeBeforeDispatchStarts
public int getTimeBeforeDispatchStarts()
-
setTimeBeforeDispatchStarts
public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts)
-
getConsumersBeforeDispatchStarts
public int getConsumersBeforeDispatchStarts()
-
setConsumersBeforeDispatchStarts
public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts)
-
setAllConsumersExclusiveByDefault
public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault)
-
isAllConsumersExclusiveByDefault
public boolean isAllConsumersExclusiveByDefault()
-
isResetNeeded
public boolean isResetNeeded()
-
browse
public Message[] browse()
- Specified by:
browsein interfaceDestination
-
doBrowseList
protected void doBrowseList(List<Message> browseList, int max, PendingList list, ReentrantReadWriteLock lock, ConnectionContext connectionContext, String name) throws Exception
- Throws:
Exception
-
getMessage
public QueueMessageReference getMessage(String id)
-
clearPendingMessages
public void clearPendingMessages(int pendingAdditionsCount)
- Specified by:
clearPendingMessagesin interfaceDestination
-
removeMessage
public boolean removeMessage(String messageId) throws Exception
Removes the message matching the given messageId- Throws:
Exception
-
removeMatchingMessages
public int removeMatchingMessages(String selector) throws Exception
Removes the messages matching the given selector- Returns:
- the number of messages removed
- Throws:
Exception
-
removeMatchingMessages
public int removeMatchingMessages(String selector, int maximumMessages) throws Exception
Removes the messages matching the given selector up to the maximum number of matched messages- Returns:
- the number of messages removed
- Throws:
Exception
-
removeMatchingMessages
public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception
Removes the messages matching the given filter up to the maximum number of matched messages- Returns:
- the number of messages removed
- Throws:
Exception
-
copyMessageTo
public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception
Copies the message matching the given messageId- Throws:
Exception
-
copyMatchingMessagesTo
public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception
Copies the messages matching the given selector- Returns:
- the number of messages copied
- Throws:
Exception
-
copyMatchingMessagesTo
public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception
Copies the messages matching the given selector up to the maximum number of matched messages- Returns:
- the number of messages copied
- Throws:
Exception
-
copyMatchingMessages
public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception
Copies the messages matching the given filter up to the maximum number of matched messages- Returns:
- the number of messages copied
- Throws:
Exception
-
moveMessageTo
public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception
Move a message- Parameters:
context- connection contextm- QueueMessageReferencedest- ActiveMQDestination- Throws:
Exception
-
moveMessageTo
public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception
Moves the message matching the given messageId- Throws:
Exception
-
moveMatchingMessagesTo
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception
Moves the messages matching the given selector- Returns:
- the number of messages removed
- Throws:
Exception
-
moveMatchingMessagesTo
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception
Moves the messages matching the given selector up to the maximum number of matched messages- Throws:
Exception
-
moveMatchingMessagesTo
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception
Moves the messages matching the given filter up to the maximum number of matched messages- Throws:
Exception
-
retryMessages
public int retryMessages(ConnectionContext context, int maximumMessages) throws Exception
- Throws:
Exception
-
iterate
public boolean iterate()
- Specified by:
iteratein interfaceTask- Returns:
- true if we would like to iterate again
- See Also:
Task.iterate()
-
pauseDispatch
public void pauseDispatch()
-
resumeDispatch
public void resumeDispatch()
-
isDispatchPaused
public boolean isDispatchPaused()
-
createMessageIdFilter
protected MessageReferenceFilter createMessageIdFilter(String messageId)
-
createSelectorFilter
protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException
- Throws:
InvalidSelectorException
-
removeMessage
protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException
- Throws:
IOException
-
removeMessage
protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException
- Throws:
IOException
-
removeMessage
protected void removeMessage(ConnectionContext context, Subscription sub, QueueMessageReference reference, MessageAck ack) throws IOException
- Throws:
IOException
-
messageExpired
public void messageExpired(ConnectionContext context, MessageReference reference)
-
messageExpired
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference)
Description copied from interface:DestinationInform the Destination a message has expired- Specified by:
messageExpiredin interfaceDestination
-
wakeup
public void wakeup()
Description copied from interface:Destinationoptionally called by a Subscriber - to inform the Destination its ready for more messages- Specified by:
wakeupin interfaceDestination
-
assignMessageGroup
protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception
- Throws:
Exception
-
assignGroup
protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException
- Throws:
IOException
-
pageInMessages
protected void pageInMessages(boolean force, int maxPageSize) throws Exception- Throws:
Exception
-
processDispatchNotification
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception
Description copied from interface:Destinationcalled on Queues in slave mode to allow dispatch to follow subscription choice of master- Specified by:
processDispatchNotificationin interfaceDestination- Overrides:
processDispatchNotificationin classBaseDestination- Throws:
Exception
-
onUsageChanged
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)
- Specified by:
onUsageChangedin interfaceUsageListener
-
getLog
protected org.slf4j.Logger getLog()
- Specified by:
getLogin classBaseDestination
-
isOptimizeStorage
protected boolean isOptimizeStorage()
-
-