Examples

The following example gives a brief demonstration for connecting, sending and receiving a message using NMS.

Sync NMS Example
using System;
using Apache.NMS;
using Apache.NMS.Util;

namespace Apache.NMS.ActiveMQ.Test
{
public class TestMain
{
    public static void Main(string[] args)
    {
        // Example connection strings:
        //    activemq:tcp://activemqhost:61616
        //    stomp:tcp://activemqhost:61613
        //    ems:tcp://tibcohost:7222
        //    msmq://localhost

        Uri connecturi = new Uri("activemq:tcp://activemqhost:61616");
        
        Console.WriteLine("About to connect to " + connecturi);

        // NOTE: ensure the nmsprovider-activemq.config file exists in the executable folder.
        IConnectionFactory factory = new NMSConnectionFactory(connecturi);

        using(IConnection connection = factory.CreateConnection())
        using(ISession session = connection.CreateSession())
        {
             // Examples for getting a destination:
             //
             // Hard coded destinations:
             //    IDestination destination = session.GetQueue("FOO.BAR");
             //    Debug.Assert(destination is IQueue);
             //    IDestination destination = session.GetTopic("FOO.BAR");
             //    Debug.Assert(destination is ITopic);
             //
             // Embedded destination type in the name:
             //    IDestination destination = SessionUtil.GetDestination(session, "queue://FOO.BAR");
             //    Debug.Assert(destination is IQueue);
             //    IDestination destination = SessionUtil.GetDestination(session, "topic://FOO.BAR");
             //    Debug.Assert(destination is ITopic);
             //
             // Defaults to queue if type is not specified:
             //    IDestination destination = SessionUtil.GetDestination(session, "FOO.BAR");
             //    Debug.Assert(destination is IQueue);
             //
             // .NET 3.5 Supports Extension methods for a simplified syntax:
             //    IDestination destination = session.GetDestination("queue://FOO.BAR");
             //    Debug.Assert(destination is IQueue);
             //    IDestination destination = session.GetDestination("topic://FOO.BAR");
             //    Debug.Assert(destination is ITopic);

            IDestination destination = SessionUtil.GetDestination(session, "queue://FOO.BAR");
            Console.WriteLine("Using destination: " + destination);

            // Create a consumer and producer
            using(IMessageConsumer consumer = session.CreateConsumer(destination))
            using(IMessageProducer producer = session.CreateProducer(destination))
            {
                // Start the connection so that messages will be processed.
                connection.Start();
                producer.Persistent = true;

                // Send a message
                ITextMessage request = session.CreateTextMessage("Hello World!");
                request.NMSCorrelationID = "abc";
                request.Properties["NMSXGroupID"] = "cheese";
                request.Properties["myHeader"] = "Cheddar";

                producer.Send(request);

                // Consume a message
                ITextMessage message = consumer.Receive() as ITextMessage;
                if(message == null)
                {
                    Console.WriteLine("No message received!");
                }
                else
                {
                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                    Console.WriteLine("Received message with text: " + message.Text);
                }
            }
        }
    }
}
}

Asynchronous consumption

You have the choice of synchronously pulling messages via the Receive() methods as shown above, or you can use the asynchronous approach demonstrated in the following example:

Async NMS Example

using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;

namespace Apache.NMS.ActiveMQ.Test
{

public class TestMain
{

    protected static AutoResetEvent semaphore = new AutoResetEvent(false);
    protected static ITextMessage message = null;
    protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);

    public static void Main(string[] args)
    {
        // Example connection strings:
        //    activemq:tcp://activemqhost:61616
        //    stomp:tcp://activemqhost:61613
        //    ems:tcp://tibcohost:7222
        //    msmq://localhost

        Uri connecturi = new Uri("activemq:tcp://activemqhost:61616");
        
        Console.WriteLine("About to connect to " + connecturi);

        // NOTE: ensure the nmsprovider-activemq.config file exists in the executable folder.

        IConnectionFactory factory = new NMSConnectionFactory(connecturi);

        using(IConnection connection = factory.CreateConnection())
        using(ISession session = connection.CreateSession())
        {
            // Examples for getting a destination:
            //
            // Hard coded destinations:
            //    IDestination destination = session.GetQueue("FOO.BAR");
            //    Debug.Assert(destination is IQueue);
            //    IDestination destination = session.GetTopic("FOO.BAR");
            //    Debug.Assert(destination is ITopic);
            //
            // Embedded destination type in the name:
            //    IDestination destination = SessionUtil.GetDestination(session, "queue://FOO.BAR");
            //    Debug.Assert(destination is IQueue);
            //    IDestination destination = SessionUtil.GetDestination(session, "topic://FOO.BAR");
            //    Debug.Assert(destination is ITopic);
            //
            // Defaults to queue if type is not specified:
            //    IDestination destination = SessionUtil.GetDestination(session, "FOO.BAR");
            //    Debug.Assert(destination is IQueue);
            //
            // .NET 3.5 Supports Extension methods for a simplified syntax:
            //    IDestination destination = session.GetDestination("queue://FOO.BAR");
            //    Debug.Assert(destination is IQueue);
            //    IDestination destination = session.GetDestination("topic://FOO.BAR");
            //    Debug.Assert(destination is ITopic);

            IDestination destination = SessionUtil.GetDestination(session, "queue://FOO.BAR");

            Console.WriteLine("Using destination: " + destination);

            // Create a consumer and producer
            using(IMessageConsumer consumer = session.CreateConsumer(destination))
            using(IMessageProducer producer = session.CreateProducer(destination))
            {
                // Start the connection so that messages will be processed.
                connection.Start();
                producer.Persistent = true;
                producer.RequestTimeout = receiveTimeout;
                consumer.Listener += new MessageListener(OnMessage);

                // Send a message
                ITextMessage request = session.CreateTextMessage("Hello World!");
                request.NMSCorrelationID = "abc";
                request.Properties["NMSXGroupID"] = "cheese";
                request.Properties["myHeader"] = "Cheddar";

                producer.Send(request);

                // Wait for the message
                semaphore.WaitOne((int) receiveTimeout.TotalMilliseconds, true);
                if(message == null)
                {
                    Console.WriteLine("No message received!");
                }
                else
                {
                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                    Console.WriteLine("Received message with text: " + message.Text);
                }
            }
        }
    }

    protected static void OnMessage(IMessage receivedMsg)
    {
        message = receivedMsg as ITextMessage;
        semaphore.Set();
    }
}
}

The above uses a C# delegate so that the OnMessage() method will be called whenever a message arrives.

Note that the threading contract is similar to that of JMS - messages are dispatched for a single session's consumers in one thread at once. Consumers in different sessions can process messages concurrently, but consumers in the same session are guarenteed to be called by only one thread at a time (e.g., if you have 3 consumers on a session, then only one of the consumers will be called concurrently).

© 2004-2011 The Apache Software Foundation.
Apache ActiveMQ, ActiveMQ, Apache, the Apache feather logo, and the Apache ActiveMQ project logo are trademarks of The Apache Software Foundation. All other marks mentioned may be trademarks or registered trademarks of their respective owners.
Graphic Design By Hiram