Examples
The following example gives a brief demonstration for connecting, sending and receiving a message using NMS.
using System;
using Apache.NMS;
using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ.Test
{
public class TestMain
{
public static void Main(string[] args)
{
Uri connecturi = new Uri("activemq:tcp:);
Console.WriteLine("About to connect to " + connecturi);
IConnectionFactory factory = new NMSConnectionFactory(connecturi);
using(IConnection connection = factory.CreateConnection())
using(ISession session = connection.CreateSession())
{
IDestination destination = SessionUtil.GetDestination(session, "queue:);
Console.WriteLine("Using destination: " + destination);
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
connection.Start();
producer.Persistent = true;
ITextMessage request = session.CreateTextMessage("Hello World!");
request.NMSCorrelationID = "abc";
request.Properties["NMSXGroupID"] = "cheese";
request.Properties["myHeader"] = "Cheddar";
producer.Send(request);
ITextMessage message = consumer.Receive() as ITextMessage;
if(message == null)
{
Console.WriteLine("No message received!");
}
else
{
Console.WriteLine("Received message with ID: " + message.NMSMessageId);
Console.WriteLine("Received message with text: " + message.Text);
}
}
}
}
}
}
Asynchronous consumption
You have the choice of synchronously pulling messages via the Receive() methods as shown above, or you can use the asynchronous approach demonstrated in the following example:
using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ.Test
{
public class TestMain
{
protected static AutoResetEvent semaphore = new AutoResetEvent(false);
protected static ITextMessage message = null;
protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
public static void Main(string[] args)
{
Uri connecturi = new Uri("activemq:tcp:);
Console.WriteLine("About to connect to " + connecturi);
IConnectionFactory factory = new NMSConnectionFactory(connecturi);
using(IConnection connection = factory.CreateConnection())
using(ISession session = connection.CreateSession())
{
IDestination destination = SessionUtil.GetDestination(session, "queue:);
Console.WriteLine("Using destination: " + destination);
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
connection.Start();
producer.Persistent = true;
producer.RequestTimeout = receiveTimeout;
consumer.Listener += new MessageListener(OnMessage);
ITextMessage request = session.CreateTextMessage("Hello World!");
request.NMSCorrelationID = "abc";
request.Properties["NMSXGroupID"] = "cheese";
request.Properties["myHeader"] = "Cheddar";
producer.Send(request);
semaphore.WaitOne((int) receiveTimeout.TotalMilliseconds, true);
if(message == null)
{
Console.WriteLine("No message received!");
}
else
{
Console.WriteLine("Received message with ID: " + message.NMSMessageId);
Console.WriteLine("Received message with text: " + message.Text);
}
}
}
}
protected static void OnMessage(IMessage receivedMsg)
{
message = receivedMsg as ITextMessage;
semaphore.Set();
}
}
}
The above uses a C# delegate so that the OnMessage() method will be called whenever a message arrives.
Note that the threading contract is similar to that of JMS - messages are dispatched for a single session's consumers in one thread at once. Consumers in different sessions can process messages concurrently, but consumers in the same session are guarenteed to be called by only one thread at a time (e.g., if you have 3 consumers on a session, then only one of the consumers will be called concurrently).