ActiveMQ supports advisory messages which allows you to watch the system using regular CMS messages. A few things that you can currently do with advisory messages are:

  • See consumers, producers and connections starting and stopping
  • See temporary destinations being created and destroyed
  • Get notified messages expiring on topics and queues
  • Observe brokers sending messages to destinations with no consumers.
  • See connections starting and stopping

Advisory messages can be thought as some kind of administrative channel where you receive information regarding what is happening on your JMS provider along with what's happening with producers, consumers and destinations. To learn more about the broker's advisory message support see this article.

Before You Begin

This tutorial assumes that the reader has a working knowledge of the CMS API and knows how to build basic applications using the ActiveMQ-CPP Library. If you aren't sure how to use the CMS API then you should first read the CMS API Overview. The tutorial is written against the ActiveMQ-CPP 3.0 release API and while it is possible to handle advisory messages using the older ActiveMQ-CPP 2.x clients there may be some code differences which are not shown in this article.

Subscribing to Advisory Topics

To receive advisory message's you first need to subscribe to the Topic that supplies the advisory message you want. You subscribe to these Topics like you would any other destination, the trick is to use the correct name for the Topic that you want to use. Lets first take a look at a couple of the types available for use (this is not a complete set).

Advisory TopicsDescriptionpropertiesData Structure
ActiveMQ.Advisory.Connection Connection start & stop messages
ActiveMQ.Advisory.Producer.Queue Producer start & stop messages on a Queue String='producerCount' - the number of producers ProducerInfo
ActiveMQ.Advisory.Producer.Topic Producer start & stop messages on a Topic String='producerCount' - the number of producersProducerInfo
ActiveMQ.Advisory.Consumer.Queue Consumer start & stop messages on a Queue String='consumerCount' - the number of ConsumersConsumerInfo
ActiveMQ.Advisory.Consumer.Topic Consumer start & stop messages on a Topic String='consumerCount' - the number of ConsumersConsumerInfo

Now, looking at the above list lets pick one of the Topics and figure out how we would create the CMS Topic to subscribe to in order to receive the advisory messages. Using the Java client we could create the Destination using the utility class AdvisorySupport but for now ActiveMQ-CPP doesn't provide such a utility class, hopefully we will see this added in a future release (Contributions are of course always welcomed!).

If we had a Topic named TOPIC.FOO and we wanted to know when a producer subscribed to that Topic we would need to create a Topic object whose name is ActiveMQ.Advisory.Producer.Topic.TOPIC.FOO in order to receive the advisory message we are interested in. We know this because we can look at the above table and see that the ActiveMQ.Advisory.Producer.Topic is informed whenever a Producer starts or stops publishing messages on a Topic and we also know that our Topic is named TOPIC.FOO, so adding them together gets us the name of our Advisory Topic, we also know this because we peaked at the class, and no, that's not cheating. Below is a code snippet showing the creation of the Topic using a CMS Session:

Creating an Advisory Topic for Producers on TOPIC.FOO

Once we create the Topic for the advisory messages we want to listen for we just need to create a consumer to listen for them, the code snippet below demonstrates this:

Creating an Consumer for an Advisory Topic.

As you can see there is no difference between subscriptions to advisory topics that subscriptions to any other Destination in CMS. In the above example we registered as an asynchronous listener, you can also use the normal blocking receive method but we prefer this method.

Dealing with incoming Advisory Messages

Every Advisory has the message type 'Advisory' and some predefined message properties, to check a CMS Message for this type you would call the getCMSType method on a Message object. In some cases you will know that the only message you are going to receive are advisory messages because your client only subscribes to advisory topics, other times you might have wired multiple MessageConsumers to the same MessageListener in which case you will have to check the message tppe. Once you know you are dealing with an advisory message then you can start to inspect it to determine what sort of message it is and extract the data that is meaningful to your application.

Many of the advisory messages store meaningful data in the Message properties, for instance the Consumer Start / Stop advisory message contains an element with the key consumerCount which is populated with the current number of active consumers on the Topic or Queue in question. Lets take a look at a code snippet now that checks the messages received in an onMessage callback to see if its an advisory message and acts on it if it is:

Using the CMS Type to determine if a Message is an Advisory

Complete Example: Producer that only Produces when there is a Consumer

Now that you've seen the basics of advisory message processing its time to show you a complete example that demonstrates what you can do with advisory messages. The following code shows a class header and source file that implements a basic CMS Producer that will send heart beat message to a Topic called HEART-BEAT-CHANNEL only when there are active consumers, otherwise it sits idle.

AdvisoryProducer Header file
AdvisoryProducer Source file

*Advanced Topic* Dealing with Command Objects in Advisories

If you read the ActiveMQ Advisory Message article mentioned at the start of this tutorial then you know that certain advisory messages can contain an embedded command object. If you didn't read that article then this section is going to be very confusing, so go read it. We can access those command object in CMS with a little work which means we can take full advantage of the Advisory Message feature.

All Advisory Messages are sent as a basic ActiveMQMessage to your client. The underlying type hierarchy in ActiveMQ-CPP is the same as that of ActiveMQ so the names of the embedded command objects you saw in the Advisory article are the same and they contain mostly the same information, although sometimes the information is encoded in a more C++ friendly or unfriendly way depending on your point of view.

To demonstrate how we can access the command objects lets try and create a client application that listens to the Broker for advisories that indicate that Temporary Destinations have either been created or destroyed. The Broker will publish advisory messages to the "ActiveMQ.Advisory.TempTopic" and "ActiveMQ.Advisory.TempQueue" Topics whenever the corresponding Temporary Destination is created or destroyed and the command object will be of type DestinationInfo. The DestinationInfo object contains a Destination object describing the Destination in question and an Operation Type value telling whether the command is a create or destroy command. First lets look at how we subscribe to this Advisory Topic:

Subscribing to a Composite Advisory Topic

As you can see in the above code snippet we just create a new Topic object whose name is a composite of the two Topics we want to subscribe on, this will allow our single MessageConsumer instance to receive both Temporary Topic and Temporary Queue advisories. As before we also create a MessageConsumer and register our class' instance as the asynchronous listener. Now all that's left is to implement the onMessage method of the MessageListener interface, lets take a look at that code now:

Processing an Advisory message with an embedded command object

Fortunately for use the code above looks more complicated than it really is, lets walk through it a bit more slowly now to understand what is going on:

Getting to the ActiveMQMessage object

The first thing we need to do is check that we received an advisory message, ActiveMQ encodes the Message Type as "Advisory" so that's easy enough. We don't technically need to do this here since our consumer only listens on an advisory Topic but its not a bad idea to check. Once we know its an advisory message we know that the message pointer should be of type ActiveMQMessage under that generic cms::Message disguise its wearing so we use a dynamic_cast to convert it. Now that we've converted to an ActiveMQMessage what's next, well lets take a look:

Checking for an embedded command object

Every ActiveMQMessage derived object has a method called getDataStructure which can be used for all sorts of useful things, here we are trying to see if there is a command object contained in this message, and you guessed it, the getDataStructure method will tell us if there is one. If there is then we can move onto checking for a DestinationInfo object:

Getting to the DestinationInfo object

First thing you are probably asking about this code snippet is "what is that Pointer thing?", that is a thread safe smart pointer that is used internally by ActiveMQ-CPP to manage all the pointers that make up the cms::Message objects among other things. We create an instance of a Pointer<DestinationInfo> type which will pointer to our DestinationInfo command if the dynamicCast method is able to make that conversion, if not a ClassCastException is thrown. Once we have the DestinationInfo pointer we can retrieve the Operation Type of the command and then compare it to the constants in ActiveMQConstants to see what is being done to the Destination. There are only two operation types, add and remove, but since the DestinationInfo object encodes the operation type value as an unsigned char we provide a fall-back case to alert us to that error. We are almost done now, all that remains is to output what happened, and also let the user know what the name of the Destination is, the getPhysicalName method in the Destination class tells us that. You could also find out whether the Destination is a Topic or a Queue using the Destination object, we leave that as an exercise to the reader.

Complete Example: Consumer that listens for Creation and Destruction of Temporary Destinations

The complete code of our client application is shown below, you can also find this code as well as a simple client that creates both a Temporary Topic and a Temporary Queue in the examples folder in the source distribution.

TempDestinationAdvisoryConsumer Header File
TempDestinationAdvisoryConsumer Source File
© 2004-2011 The Apache Software Foundation.
Apache ActiveMQ, ActiveMQ, Apache, the Apache feather logo, and the Apache ActiveMQ project logo are trademarks of The Apache Software Foundation. All other marks mentioned may be trademarks or registered trademarks of their respective owners.
Graphic Design By Hiram