Class AbstractMQTTSubscriptionStrategy
- java.lang.Object
-
- org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy
-
- All Implemented Interfaces:
BrokerServiceAware
,MQTTSubscriptionStrategy
- Direct Known Subclasses:
MQTTDefaultSubscriptionStrategy
,MQTTVirtualTopicSubscriptionStrategy
public abstract class AbstractMQTTSubscriptionStrategy extends Object implements MQTTSubscriptionStrategy, BrokerServiceAware
Abstract implementation of theMQTTSubscriptionStrategy
interface providing the base functionality that is common to most implementations.
-
-
Field Summary
Fields Modifier and Type Field Description protected BrokerService
brokerService
protected LongSequenceGenerator
consumerIdGenerator
protected ConcurrentMap<String,MQTTSubscription>
mqttSubscriptionByTopic
protected MQTTProtocolConverter
protocol
protected Set<String>
restoredDurableSubs
protected ConcurrentMap<ConsumerId,MQTTSubscription>
subscriptionsByConsumerId
-
Constructor Summary
Constructors Constructor Description AbstractMQTTSubscriptionStrategy()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
deleteDurableSubs(List<SubscriptionInfo> subs)
protected byte
doSubscribe(ConsumerInfo consumerInfo, String topicName, org.fusesource.mqtt.client.QoS qoS)
void
doUnSubscribe(MQTTSubscription subscription)
protected ConsumerId
getNextConsumerId()
MQTTProtocolConverter
getProtocolConverter()
MQTTSubscription
getSubscription(ConsumerId consumerId)
Lookup anMQTTSubscription
instance based on knownConsumerId
value.void
initialize(MQTTProtocolConverter protocol)
Initialize the strategy before first use.boolean
isControlTopic(ActiveMQDestination destination)
Allows the protocol handler to interrogate an destination name to determine if it is equivalent to the MQTT control topic (starts with $).protected List<SubscriptionInfo>
lookupSubscription(String clientId)
void
onReSubscribe(MQTTSubscription mqttSubscription)
Called when a client sends a duplicate subscribe request which should force any retained messages on that topic to be replayed again as though the client had just subscribed for the first time.ActiveMQDestination
onSend(String topicName)
Intercepts PUBLISH operations from the client and allows the strategy to map the target destination so that the send operation will land in the destinations that this strategy has mapped the incoming subscribe requests to.String
onSend(ActiveMQDestination destination)
Intercepts send operations from the broker and allows the strategy to map the target topic name so that the client sees a valid Topic name.byte
onSubscribe(org.fusesource.mqtt.client.Topic topic)
Called for each Topic that a client requests to subscribe to.protected void
restoreDurableSubs(List<SubscriptionInfo> subs)
void
setBrokerService(BrokerService brokerService)
void
setProtocolConverter(MQTTProtocolConverter parent)
Sets theMQTTProtocolConverter
that is the parent of this strategy object.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy
onConnect, onSubscribe, onUnSubscribe
-
-
-
-
Field Detail
-
protocol
protected MQTTProtocolConverter protocol
-
brokerService
protected BrokerService brokerService
-
subscriptionsByConsumerId
protected final ConcurrentMap<ConsumerId,MQTTSubscription> subscriptionsByConsumerId
-
mqttSubscriptionByTopic
protected final ConcurrentMap<String,MQTTSubscription> mqttSubscriptionByTopic
-
consumerIdGenerator
protected final LongSequenceGenerator consumerIdGenerator
-
-
Method Detail
-
initialize
public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException
Description copied from interface:MQTTSubscriptionStrategy
Initialize the strategy before first use.- Specified by:
initialize
in interfaceMQTTSubscriptionStrategy
- Parameters:
protocol
- the MQTTProtocolConverter that is initializing the strategy- Throws:
MQTTProtocolException
- if an error occurs during initialization.
-
setBrokerService
public void setBrokerService(BrokerService brokerService)
- Specified by:
setBrokerService
in interfaceBrokerServiceAware
-
setProtocolConverter
public void setProtocolConverter(MQTTProtocolConverter parent)
Description copied from interface:MQTTSubscriptionStrategy
Sets theMQTTProtocolConverter
that is the parent of this strategy object.- Specified by:
setProtocolConverter
in interfaceMQTTSubscriptionStrategy
- Parameters:
parent
- theMQTTProtocolConverter
that owns this strategy.
-
getProtocolConverter
public MQTTProtocolConverter getProtocolConverter()
- Specified by:
getProtocolConverter
in interfaceMQTTSubscriptionStrategy
- Returns:
- the
MQTTProtocolConverter
that owns this strategy.
-
onSubscribe
public byte onSubscribe(org.fusesource.mqtt.client.Topic topic) throws MQTTProtocolException
Description copied from interface:MQTTSubscriptionStrategy
Called for each Topic that a client requests to subscribe to. The strategy needs check each Topic for duplicate subscription requests and change of QoS state.- Specified by:
onSubscribe
in interfaceMQTTSubscriptionStrategy
- Parameters:
topic
- the MQTT Topic instance being subscribed to.- Returns:
- the assigned QoS value given to the new subscription.
- Throws:
MQTTProtocolException
- if an error occurs while processing the subscribe actions.
-
onReSubscribe
public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException
Description copied from interface:MQTTSubscriptionStrategy
Called when a client sends a duplicate subscribe request which should force any retained messages on that topic to be replayed again as though the client had just subscribed for the first time. The method should not unsubscribe the client as it might miss messages sent while the subscription is being recreated.- Specified by:
onReSubscribe
in interfaceMQTTSubscriptionStrategy
- Parameters:
mqttSubscription
- the MQTTSubscription that contains the subscription state.- Throws:
MQTTProtocolException
-
onSend
public ActiveMQDestination onSend(String topicName)
Description copied from interface:MQTTSubscriptionStrategy
Intercepts PUBLISH operations from the client and allows the strategy to map the target destination so that the send operation will land in the destinations that this strategy has mapped the incoming subscribe requests to.- Specified by:
onSend
in interfaceMQTTSubscriptionStrategy
- Parameters:
topicName
- the targeted Topic that the client sent the message to.- Returns:
- an ActiveMQ Topic instance that lands the send in the correct destinations.
-
onSend
public String onSend(ActiveMQDestination destination)
Description copied from interface:MQTTSubscriptionStrategy
Intercepts send operations from the broker and allows the strategy to map the target topic name so that the client sees a valid Topic name.- Specified by:
onSend
in interfaceMQTTSubscriptionStrategy
- Parameters:
destination
- the destination that the message was dispatched from- Returns:
- an Topic name that is valid for the receiving client.
-
isControlTopic
public boolean isControlTopic(ActiveMQDestination destination)
Description copied from interface:MQTTSubscriptionStrategy
Allows the protocol handler to interrogate an destination name to determine if it is equivalent to the MQTT control topic (starts with $). Since the mapped destinations that the strategy might alter the naming scheme the strategy must provide a way to reverse map and determine if the destination was originally an MQTT control topic.- Specified by:
isControlTopic
in interfaceMQTTSubscriptionStrategy
- Parameters:
destination
- the destination to query.- Returns:
- true if the destination is an MQTT control topic.
-
getSubscription
public MQTTSubscription getSubscription(ConsumerId consumerId)
Description copied from interface:MQTTSubscriptionStrategy
Lookup anMQTTSubscription
instance based on knownConsumerId
value.- Specified by:
getSubscription
in interfaceMQTTSubscriptionStrategy
- Parameters:
consumerId
- the consumer ID to lookup.- Returns:
- the
MQTTSubscription
for the consumer or null if no subscription exists.
-
getNextConsumerId
protected ConsumerId getNextConsumerId()
-
doSubscribe
protected byte doSubscribe(ConsumerInfo consumerInfo, String topicName, org.fusesource.mqtt.client.QoS qoS) throws MQTTProtocolException
- Throws:
MQTTProtocolException
-
doUnSubscribe
public void doUnSubscribe(MQTTSubscription subscription)
-
deleteDurableSubs
protected void deleteDurableSubs(List<SubscriptionInfo> subs)
-
restoreDurableSubs
protected void restoreDurableSubs(List<SubscriptionInfo> subs)
-
lookupSubscription
protected List<SubscriptionInfo> lookupSubscription(String clientId) throws MQTTProtocolException
- Throws:
MQTTProtocolException
-
-