Class Topic
- java.lang.Object
-
- org.apache.activemq.broker.region.BaseDestination
-
- org.apache.activemq.broker.region.Topic
-
- All Implemented Interfaces:
Destination,Message.MessageDestination,Service,Task
- Direct Known Subclasses:
TempTopic
public class Topic extends BaseDestination implements Task
The Topic is a destination that sends a copy of a message to every active Subscription registered.
-
-
Field Summary
Fields Modifier and Type Field Description protected CopyOnWriteArrayList<Subscription>consumersprotected static org.slf4j.LoggerLOG-
Fields inherited from class org.apache.activemq.broker.region.BaseDestination
blockedProducerWarningInterval, broker, brokerService, cursorMemoryHighWaterMark, deadLetterStrategy, DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC, destination, destinationStatistics, DUPLICATE_FROM_STORE_MSG_PREFIX, EXPIRE_MESSAGE_PERIOD, expireMessagesPeriod, lastBlockedProducerWarnTime, MAX_AUDIT_DEPTH, MAX_BROWSE_PAGE_SIZE, MAX_PAGE_SIZE, MAX_PRODUCERS_TO_AUDIT, memoryUsage, regionBroker, scheduler, started, store, storeUsageHighWaterMark, systemUsage
-
Fields inherited from interface org.apache.activemq.broker.region.Destination
DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL, DEFAULT_DEAD_LETTER_STRATEGY
-
-
Constructor Summary
Constructors Constructor Description Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory)
-
Method Summary
-
Methods inherited from class org.apache.activemq.broker.region.BaseDestination
addProducer, canGC, convertToNonRangedAck, createConnectionContext, dispose, duplicateFromStore, fastProducer, getActiveMQDestination, getBlockedProducerWarningInterval, getCursorMemoryHighWaterMark, getDeadLetterStrategy, getDestinationSequenceId, getDestinationStatistics, getExpireMessagesPeriod, getInactiveTimeoutBeforeGC, getMaxAuditDepth, getMaxBrowsePageSize, getMaxExpirePageSize, getMaxPageSize, getMaxProducersToAudit, getMemoryUsage, getMessageStore, getMinimumMessageSize, getName, getOptimizeMessageStoreInFlightLimit, getSlowConsumerStrategy, getStoreUsageHighWaterMark, getSystemUsage, getTempUsage, hasRegularConsumers, isActive, isAdvisoryForConsumed, isAdvisoryForDelivery, isAdvisoryForDiscardingMessages, isAdvisoryForFastProducers, isAdvisoryForSlowConsumers, isAdvisoryWhenFull, isAlwaysRetroactive, isDisposed, isDLQ, isDoOptimzeMessageStorage, isEnableAudit, isFlowControlLogRequired, isFull, isGcIfInactive, isGcWithNetworkConsumers, isIncludeBodyForAdvisory, isLazyDispatch, isPersistJMSRedelivered, isPrioritizedMessages, isProducerFlowControl, isReduceMemoryFootprint, isSendAdvisoryIfNoConsumers, isSendDuplicateFromStoreToDLQ, isUseCache, markForGC, messageConsumed, messageDelivered, messageDiscarded, onMessageWithNoConsumers, processDispatchNotification, removeProducer, setAdvisoryForConsumed, setAdvisoryForDelivery, setAdvisoryForDiscardingMessages, setAdvisoryForFastProducers, setAdvisoryForSlowConsumers, setAdvisoryWhenFull, setAlwaysRetroactive, setBlockedProducerWarningInterval, setCursorMemoryHighWaterMark, setDeadLetterStrategy, setDoOptimzeMessageStorage, setEnableAudit, setExpireMessagesPeriod, setGcIfInactive, setGcWithNetworkConsumers, setInactiveTimeoutBeforeGC, setIncludeBodyForAdvisory, setLazyDispatch, setMaxAuditDepth, setMaxBrowsePageSize, setMaxExpirePageSize, setMaxPageSize, setMaxProducersToAudit, setMemoryUsage, setMinimumMessageSize, setOptimizeMessageStoreInFlightLimit, setPersistJMSRedelivered, setPrioritizedMessages, setProducerFlowControl, setReduceMemoryFootprint, setSendAdvisoryIfNoConsumers, setSendDuplicateFromStoreToDLQ, setSlowConsumerStrategy, setStoreUsageHighWaterMark, setUseCache, slowConsumer, waitForSpace, waitForSpace
-
-
-
-
Field Detail
-
LOG
protected static final org.slf4j.Logger LOG
-
consumers
protected final CopyOnWriteArrayList<Subscription> consumers
-
-
Constructor Detail
-
Topic
public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception
- Throws:
Exception
-
-
Method Detail
-
initialize
public void initialize() throws ExceptionDescription copied from class:BaseDestinationinitialize the destination- Overrides:
initializein classBaseDestination- Throws:
Exception
-
getConsumers
public List<Subscription> getConsumers()
- Specified by:
getConsumersin interfaceDestination- Specified by:
getConsumersin classBaseDestination
-
lock
public boolean lock(MessageReference node, LockOwner sub)
-
addSubscription
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
- Specified by:
addSubscriptionin interfaceDestination- Overrides:
addSubscriptionin classBaseDestination- Throws:
Exception
-
removeSubscription
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception
- Specified by:
removeSubscriptionin interfaceDestination- Overrides:
removeSubscriptionin classBaseDestination- Throws:
Exception
-
deleteSubscription
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception
- Throws:
Exception
-
activate
public void activate(ConnectionContext context, DurableTopicSubscription subscription) throws Exception
- Throws:
Exception
-
deactivate
public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception
- Throws:
Exception
-
recoverRetroactiveMessages
public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception
- Throws:
Exception
-
send
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception
- Specified by:
sendin interfaceDestination- Throws:
Exception
-
acknowledge
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException
- Specified by:
acknowledgein interfaceDestination- Throws:
IOException
-
gc
public void gc()
- Specified by:
gcin interfaceDestination
-
loadMessage
public Message loadMessage(MessageId messageId) throws IOException
- Throws:
IOException
-
start
public void start() throws Exception
-
browse
public Message[] browse()
- Specified by:
browsein interfaceDestination
-
getDispatchPolicy
public DispatchPolicy getDispatchPolicy()
-
setDispatchPolicy
public void setDispatchPolicy(DispatchPolicy dispatchPolicy)
-
getSubscriptionRecoveryPolicy
public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy()
-
setSubscriptionRecoveryPolicy
public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy)
-
wakeup
public final void wakeup()
Description copied from interface:Destinationoptionally called by a Subscriber - to inform the Destination its ready for more messages- Specified by:
wakeupin interfaceDestination
-
dispatch
protected void dispatch(ConnectionContext context, Message message) throws Exception
- Throws:
Exception
-
messageExpired
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference)
Description copied from interface:DestinationInform the Destination a message has expired- Specified by:
messageExpiredin interfaceDestination
-
getLog
protected org.slf4j.Logger getLog()
- Specified by:
getLogin classBaseDestination
-
isOptimizeStorage
protected boolean isOptimizeStorage()
-
clearPendingMessages
public void clearPendingMessages(int pendingAdditionsCount)
force a reread of the store - after transaction recovery completion- Specified by:
clearPendingMessagesin interfaceDestination- Parameters:
pendingAdditionsCount-
-
getDurableTopicSubs
public Map<SubscriptionKey,DurableTopicSubscription> getDurableTopicSubs()
-
-