Package org.apache.activemq.advisory
Class AdvisoryBroker
- java.lang.Object
-
- org.apache.activemq.broker.BrokerFilter
-
- org.apache.activemq.advisory.AdvisoryBroker
-
public class AdvisoryBroker extends BrokerFilter
This broker filter handles tracking the state of the broker for purposes of publishing advisory messages to advisory consumers.
-
-
Field Summary
Fields Modifier and Type Field Description protected ProducerId
advisoryProducerId
protected ConcurrentMap<org.apache.activemq.advisory.AdvisoryBroker.VirtualConsumerPair,ConsumerInfo>
brokerConsumerDests
This is a map to track unique demand for the existence of a virtual destination so we make sure we don't send duplicate advisories.protected ConcurrentMap<ConnectionId,ConnectionInfo>
connections
protected Map<ConsumerId,ConsumerInfo>
consumers
protected ConcurrentMap<ActiveMQDestination,DestinationInfo>
destinations
protected ConcurrentMap<BrokerInfo,ActiveMQMessage>
networkBridges
protected ConcurrentMap<ProducerId,ProducerInfo>
producers
protected ConcurrentMap<ConsumerInfo,VirtualDestination>
virtualDestinationConsumers
This is a map to track all consumers that exist on the virtual destination so that we can fire an advisory later when they go away to remove the demand.protected Set<VirtualDestination>
virtualDestinations
This is a set to track all of the virtual destinations that have been added to the broker so they can be easily referenced later.-
Fields inherited from class org.apache.activemq.broker.BrokerFilter
next
-
-
Constructor Summary
Constructors Constructor Description AdvisoryBroker(Broker next)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addConnection(ConnectionContext context, ConnectionInfo info)
A client is establishing a connection with the broker.Subscription
addConsumer(ConnectionContext context, ConsumerInfo info)
Adds a consumer.Destination
addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create)
Used to create a destination.void
addDestinationInfo(ConnectionContext context, DestinationInfo info)
Add and process a DestinationInfo objectvoid
addProducer(ConnectionContext context, ProducerInfo info)
Adds a producer.void
fastProducer(ConnectionContext context, ProducerInfo producerInfo, ActiveMQDestination destination)
Called to notify a producer is too fastprotected void
fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command)
protected void
fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId)
void
fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage)
protected void
fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command)
protected void
fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId)
protected void
fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command)
protected void
fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId)
Map<ConnectionId,ConnectionInfo>
getAdvisoryConnections()
Collection<ConsumerInfo>
getAdvisoryConsumers()
Map<ActiveMQDestination,DestinationInfo>
getAdvisoryDestinations()
Map<ProducerId,ProducerInfo>
getAdvisoryProducers()
ConcurrentMap<ConsumerInfo,VirtualDestination>
getVirtualDestinationConsumers()
void
isFull(ConnectionContext context, Destination destination, Usage<?> usage)
Called when a Usage reaches a limitvoid
messageConsumed(ConnectionContext context, MessageReference messageReference)
called when message is consumedvoid
messageDelivered(ConnectionContext context, MessageReference messageReference)
Called when message is delivered to the brokervoid
messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference)
Called when a message is discarded - e.g.void
messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription)
A Message has Expiredvoid
networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp)
void
networkBridgeStopped(BrokerInfo brokerInfo)
void
nowMasterBroker()
called when the broker becomes the master in a master/slave configurationvoid
removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error)
A client is disconnecting from the broker.void
removeConsumer(ConnectionContext context, ConsumerInfo info)
Removes a consumer.void
removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
Used to destroy a destination.void
removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo)
Remove and process a DestinationInfo objectvoid
removeProducer(ConnectionContext context, ProducerInfo info)
Removes a producer.void
removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info)
Deletes a durable subscription.boolean
sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause)
A message needs to go the a DLQvoid
slowConsumer(ConnectionContext context, Destination destination, Subscription subs)
Called when there is a slow consumervoid
virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination)
void
virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination)
-
Methods inherited from class org.apache.activemq.broker.BrokerFilter
acknowledge, addBroker, addSession, beginTransaction, brokerServiceStarted, commitTransaction, forgetTransaction, gc, getAdaptor, getAdminConnectionContext, getBrokerId, getBrokerName, getBrokerSequenceId, getBrokerService, getClients, getDestinationMap, getDestinationMap, getDestinations, getDestinations, getDurableDestinations, getExecutor, getNext, getPeerBrokerInfos, getPreparedTransactions, getRoot, getScheduler, getTempDataStore, getVmConnectorURI, isExpired, isFaultTolerantConfiguration, isStopped, messagePull, postProcessDispatch, prepareTransaction, preProcessDispatch, processConsumerControl, processDispatchNotification, reapplyInterceptor, removeBroker, removeSession, rollbackTransaction, send, setAdminConnectionContext, start, stop
-
-
-
-
Field Detail
-
connections
protected final ConcurrentMap<ConnectionId,ConnectionInfo> connections
-
consumers
protected final Map<ConsumerId,ConsumerInfo> consumers
-
virtualDestinations
protected final Set<VirtualDestination> virtualDestinations
This is a set to track all of the virtual destinations that have been added to the broker so they can be easily referenced later.
-
virtualDestinationConsumers
protected final ConcurrentMap<ConsumerInfo,VirtualDestination> virtualDestinationConsumers
This is a map to track all consumers that exist on the virtual destination so that we can fire an advisory later when they go away to remove the demand.
-
brokerConsumerDests
protected final ConcurrentMap<org.apache.activemq.advisory.AdvisoryBroker.VirtualConsumerPair,ConsumerInfo> brokerConsumerDests
This is a map to track unique demand for the existence of a virtual destination so we make sure we don't send duplicate advisories.
-
producers
protected final ConcurrentMap<ProducerId,ProducerInfo> producers
-
destinations
protected final ConcurrentMap<ActiveMQDestination,DestinationInfo> destinations
-
networkBridges
protected final ConcurrentMap<BrokerInfo,ActiveMQMessage> networkBridges
-
advisoryProducerId
protected final ProducerId advisoryProducerId
-
-
Constructor Detail
-
AdvisoryBroker
public AdvisoryBroker(Broker next)
-
-
Method Detail
-
addConnection
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception
Description copied from interface:Broker
A client is establishing a connection with the broker.- Specified by:
addConnection
in interfaceBroker
- Overrides:
addConnection
in classBrokerFilter
- Throws:
Exception
- TODO
-
addConsumer
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception
Description copied from interface:Region
Adds a consumer.- Specified by:
addConsumer
in interfaceRegion
- Overrides:
addConsumer
in classBrokerFilter
- Parameters:
context
- the environment the operation is being executed under.- Returns:
- TODO
- Throws:
Exception
- TODO
-
addProducer
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception
Description copied from interface:Broker
Adds a producer.- Specified by:
addProducer
in interfaceBroker
- Specified by:
addProducer
in interfaceRegion
- Overrides:
addProducer
in classBrokerFilter
- Parameters:
context
- the environment the operation is being executed under.- Throws:
Exception
- TODO
-
addDestination
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception
Description copied from interface:Region
Used to create a destination. Usually, this method is invoked as a side-effect of sending a message to a destination that does not exist yet.- Specified by:
addDestination
in interfaceRegion
- Overrides:
addDestination
in classBrokerFilter
destination
- the destination to create.- Returns:
- TODO
- Throws:
Exception
- TODO
-
addDestinationInfo
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception
Description copied from interface:Broker
Add and process a DestinationInfo object- Specified by:
addDestinationInfo
in interfaceBroker
- Overrides:
addDestinationInfo
in classBrokerFilter
- Throws:
Exception
-
removeDestination
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception
Description copied from interface:Region
Used to destroy a destination. This should try to quiesce use of the destination up to the timeout allotted time before removing the destination. This will remove all persistent messages associated with the destination.- Specified by:
removeDestination
in interfaceRegion
- Overrides:
removeDestination
in classBrokerFilter
- Parameters:
context
- the environment the operation is being executed under.destination
- what is being removed from the broker.timeout
- the max amount of time to wait for the destination to quiesce- Throws:
Exception
- TODO
-
removeDestinationInfo
public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception
Description copied from interface:Broker
Remove and process a DestinationInfo object- Specified by:
removeDestinationInfo
in interfaceBroker
- Overrides:
removeDestinationInfo
in classBrokerFilter
- Throws:
Exception
-
removeConnection
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception
Description copied from interface:Broker
A client is disconnecting from the broker.- Specified by:
removeConnection
in interfaceBroker
- Overrides:
removeConnection
in classBrokerFilter
- Parameters:
context
- the environment the operation is being executed under.error
- null if the client requested the disconnect or the error that caused the client to disconnect.- Throws:
Exception
- TODO
-
removeConsumer
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception
Description copied from interface:Region
Removes a consumer.- Specified by:
removeConsumer
in interfaceRegion
- Overrides:
removeConsumer
in classBrokerFilter
- Parameters:
context
- the environment the operation is being executed under.- Throws:
Exception
- TODO
-
removeSubscription
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception
Description copied from interface:Region
Deletes a durable subscription.- Specified by:
removeSubscription
in interfaceRegion
- Overrides:
removeSubscription
in classBrokerFilter
- Parameters:
context
- the environment the operation is being executed under.info
- TODO- Throws:
Exception
- TODO
-
removeProducer
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception
Description copied from interface:Broker
Removes a producer.- Specified by:
removeProducer
in interfaceBroker
- Specified by:
removeProducer
in interfaceRegion
- Overrides:
removeProducer
in classBrokerFilter
- Parameters:
context
- the environment the operation is being executed under.- Throws:
Exception
- TODO
-
messageExpired
public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription)
Description copied from interface:Broker
A Message has Expired- Specified by:
messageExpired
in interfaceBroker
- Overrides:
messageExpired
in classBrokerFilter
subscription
- (may be null)
-
messageConsumed
public void messageConsumed(ConnectionContext context, MessageReference messageReference)
Description copied from interface:Broker
called when message is consumed- Specified by:
messageConsumed
in interfaceBroker
- Overrides:
messageConsumed
in classBrokerFilter
-
messageDelivered
public void messageDelivered(ConnectionContext context, MessageReference messageReference)
Description copied from interface:Broker
Called when message is delivered to the broker- Specified by:
messageDelivered
in interfaceBroker
- Overrides:
messageDelivered
in classBrokerFilter
-
messageDiscarded
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference)
Description copied from interface:Broker
Called when a message is discarded - e.g. running low on memory This will happen only if the policy is enabled - e.g. non durable topics- Specified by:
messageDiscarded
in interfaceBroker
- Overrides:
messageDiscarded
in classBrokerFilter
-
slowConsumer
public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs)
Description copied from interface:Broker
Called when there is a slow consumer- Specified by:
slowConsumer
in interfaceBroker
- Overrides:
slowConsumer
in classBrokerFilter
-
fastProducer
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo, ActiveMQDestination destination)
Description copied from interface:Broker
Called to notify a producer is too fast- Specified by:
fastProducer
in interfaceBroker
- Overrides:
fastProducer
in classBrokerFilter
-
virtualDestinationAdded
public void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination)
- Specified by:
virtualDestinationAdded
in interfaceBroker
- Overrides:
virtualDestinationAdded
in classBrokerFilter
-
virtualDestinationRemoved
public void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination)
- Specified by:
virtualDestinationRemoved
in interfaceBroker
- Overrides:
virtualDestinationRemoved
in classBrokerFilter
-
isFull
public void isFull(ConnectionContext context, Destination destination, Usage<?> usage)
Description copied from interface:Broker
Called when a Usage reaches a limit- Specified by:
isFull
in interfaceBroker
- Overrides:
isFull
in classBrokerFilter
-
nowMasterBroker
public void nowMasterBroker()
Description copied from interface:Broker
called when the broker becomes the master in a master/slave configuration- Specified by:
nowMasterBroker
in interfaceBroker
- Overrides:
nowMasterBroker
in classBrokerFilter
-
sendToDeadLetterQueue
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause)
Description copied from interface:Broker
A message needs to go the a DLQ- Specified by:
sendToDeadLetterQueue
in interfaceBroker
- Overrides:
sendToDeadLetterQueue
in classBrokerFilter
poisonCause
- reason for dlq submission, may be null- Returns:
- true if Message was placed in a DLQ false if discarded.
-
networkBridgeStarted
public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp)
- Specified by:
networkBridgeStarted
in interfaceBroker
- Overrides:
networkBridgeStarted
in classBrokerFilter
-
networkBridgeStopped
public void networkBridgeStopped(BrokerInfo brokerInfo)
- Specified by:
networkBridgeStopped
in interfaceBroker
- Overrides:
networkBridgeStopped
in classBrokerFilter
-
fireAdvisory
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception
- Throws:
Exception
-
fireAdvisory
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception
- Throws:
Exception
-
fireConsumerAdvisory
protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command) throws Exception
- Throws:
Exception
-
fireConsumerAdvisory
protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception
- Throws:
Exception
-
fireProducerAdvisory
protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception
- Throws:
Exception
-
fireProducerAdvisory
protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception
- Throws:
Exception
-
fireAdvisory
public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception
- Throws:
Exception
-
getAdvisoryConnections
public Map<ConnectionId,ConnectionInfo> getAdvisoryConnections()
-
getAdvisoryConsumers
public Collection<ConsumerInfo> getAdvisoryConsumers()
-
getAdvisoryProducers
public Map<ProducerId,ProducerInfo> getAdvisoryProducers()
-
getAdvisoryDestinations
public Map<ActiveMQDestination,DestinationInfo> getAdvisoryDestinations()
-
getVirtualDestinationConsumers
public ConcurrentMap<ConsumerInfo,VirtualDestination> getVirtualDestinationConsumers()
-
-