Chapter 31. Message Oriented Middleware - Apache ActiveMQ and TIBCO EMS

31.1. Introduction

The goal of Spring's messaging is to increase your productiviity when writing an enterprise strength messaging middleware applications. Spring achieves these goals in several ways. First it provides several helper classes that remove from the developer the incidental complexity and resource management issues that arise when using messaging APIs. Second, the design of these messaging helper classes promote best practices in designing a messaging application by promoting a clear separation between the messaging middleware specific code and business processing that is technology agnostic. This is generally referred to a "plain old CLR object" (or POCO) programming model.

This chapter discusses Spring's messaging support for providers whose API was modeled after the Java Message Service (JMS) API. Vendors who provide a JMS inspired API include Apache ActiveMQ, TIBCO, IBM, and Progress Software. If you are using Microsoft's Message Queue, please refer to the specific MSMQ section. The description of Spring messages features in this chapter apply to all of these JMS vendors. However, the documentation focuses on showing code examples that use Apache ActiveMQ. For code examples and some features specific to TIBCO EMS please refer to this chapter.

31.1.1. Multiple Vendor Support

As there is no de facto-standard common API across messaging vendors, Spring provides an implementation of its helper classes for each of the major messaging middleware vendors. The naming of the classes you will interact with most frequently will either be identical for each provider, but located in a different namespace, or have their prefix change to be the three-letter-acronym commonly associated with the message provider. The list of providers supported by Spring is show below along with their namespace and prefix.

  1. Apache ActiveMQ (NMS) in namespace Spring.Messaging.Nms. 'Nms' is used as the class prefix

  2. TIBCO EMS in namespace Spring.Messaging.Ems. 'Ems' is used as the class prefix.

  3. Websphere MQ in namespace Spring.Messaging.Xms, 'Xms' is used as the class prefix (in a future release)

JMS can be roughly divided into two areas of functionality, namely the production and consumption of messages. For message production and the synchronous consumption of messages the a template class, named NmsTemplate, EmsTemplate (etc.) is used. Asynchronous message consumption is performed though a multi-threaded message listener container, SimpleMessageListenerContainer. This message listener container is used to create Message-Driven POCOs (MDPs) which refer to a messaging callback class that consists of just 'plain CLR object's and is devoid of any specific messaging types or other artifacts. The IMessageConverter interface is used by both the template class and the message listener container to convert between provider message types and POCOs.

The namespace Spring.Messaging.<Vendor>.Core contains the messing template class (e.g. NmsTemplate). The template class simplifies the use of the messaging APIs by handling the creation and release of resources, much like the AdoTemplate does for ADO.NET. The JMS inspired APIs are low-level API, much like ADO.NET. As such, even the simplest of operations requires 10s of lines of code with the bulk of that code related to resource management of intermediate API objects Spring's messaging support, both in Java and .NET, addresses the error-prone boiler plate coding style one needs when using these APIs.

The design principle common to Spring template classes is to provide helper methods to perform common operations and for more sophisticated usage, delegate the essence of the processing task to user implemented callback interfaces. The messaging template follows the same design. The message template class offer various convenience methods for the sending of messages, consuming a message synchronously, and exposing the message Session and MessageProducer to the user.

The namespace Spring.Messaging.<VendorAcronym>.Support.Converter provides a IMessageConverter abstraction to convert between .NET objects and messages. The namespace Spring.Messaging.<VendorAcronym>.Support.Destinations provides various strategies for managing destinations, such as providing a service locater for destinations stored in a directory service.

Finally, the namespace Spring.Messaging.<VendorAcronym>.Connections provides an implementations of the ConnectionFactory suitable for use in standalone applications.

The rest of the sections in this chapter discusses each of the major helper classes in detail. Please refer to the sample application that ships with Spring for additional hands-on usage.

[Note]Note

To simplify documenting features that are common across all provider implementations of Spring's helper classes a specific provider, Apache ActiveMQ, was selected. As such when you see 'NmsTemplate' in the documentation, it also refers to EmsTemplate, XmsTemplate, etc. unless specifically documented otherwise. The provider specific API classes are typically named after their JMS counterparts with the possible exception of a leading 'I' in front of interfaces in order to follow .NET naming conventions. In the documentation these API artifacts are referred to as 'ConnectionFactory', 'Session', 'Message', etc. without the leading 'I'.

[Note]Note

To view some of this chapters contents that are based on TIBCO EMS please refer to the TIBCO EMS chapter.

31.1.2. Separation of Concerns

The use of MessageConverters and a POCO programming model promote messaging best practices by applying the principal of Separation of Concerns to messaging based architectures. The infrastructure concern of publishing and consuming messages is separated from the concern of business processing. These two concerns are reflected in the architecture as two distinct layers, a message processing layer and a business processing layer. The benefit of this approach is that your business processing is decoupled from the messaging technology, making it more likely to survive technological changes over time and also easier to test. Spring's MessageConverters provides support for mapping messaging data types to POCOs. Aside from being the link between the two layers, MessageConverters provide a pluggable strategy to help support the evolution of a loosely coupled architecture over time. Message formats will change over time, typically by the addition of new fields. MessageConverters can be implemented to detect different versions of messages and perform the appropriate mapping logic to POCOs such so that multiple versions of a message can be supported simultaneously, a common requirement in enterprise messaging architectures.

31.1.3. Interoperability and provider portability

Messaging is a traditional area of Interoperability across heterogeneous systems with messaging vendors providing support on multiple operating systems (Windows, UNIX, Mainframes OS's) as well as multiple language bindings (C, C++, Java, .NET, Perl, etc.). In 199x the Java Community Process came up with a specification to provide a common API across messaging providers as well as define some common messaging functionality. This specification is know as the Java Message Service. From the API perspective, it can roughly be thought of as the messaging counterpart to the ADO.NET or JDBC APIs that provide portability across different database providers.

Given this history, when messaging vendors created their .NET APIs, many did so by creating their own JMS inspired API in .NET. There is no de facto-standard common API across messaging vendors. As such, portability across vendors using Spring's helper classes is done by changing the configuration schema in your configuration to a particular vendor and doing a 'search-and-replace' on the code base, changing the namespace and a few class names. While not ideal ,using Spring will push you in the direction of isolating the messaging specific classes in its own layer and therefore will reduce the impact of the changes you make to the code when switch providers. You business logic classes called into via Spring's messaging infrastructure will remain the same.

The NMS project from Apache addresses the lack of a common API across .NET messaging providers by providing an abstract interface based API for messaging and several implementations for different providers. At the time of this writing, the project is close to releasing a 1.0 version that supports ApacheMQ, MSMQ, and TIBCO EMS. There are a few outstanding issues at the moment that prevent one using NMS as a common API for all messaging providers but hopefully these issues will be resolved. Note, that NMS serves 'double' duty as the preferred API for messaging with ActiveMQ as well as a providing portability across different messaging providers.

31.1.4. The role of Messaging API in a 'WCF world'

Windows Communication Foundation (WCF) also supports message oriented middleware. Not surprisingly, a Microsoft Message Queuing (MSMQ) binding is provided as part of WCF. The WCF programming model is higher level than the traditional messaging APIs such as JMS and NMS since you are programing to a service interface and use metadata (either XML or attributes) to configure the messaging behavior. If you prefer to use this service-oriented, RPC style approach, then look to see if a vendor provides a WCF binding for your messaging provider. Note that even with the option of using WCF, many people prefer to sit 'closer to the metal' when using messaging middleware, to access specific features and functionality not available in WCF, or simply because they are more comfortable with that programming model.

A WCF binding for Apache NMS is being developed as a separate project under the Spring Extensions umbrella project. Stay tuned for details.

31.2. Using Spring Messaging

31.2.1. Messaging Template overview

Code that uses the messaging template classes (NmsTemplate, EmsTemplate, etc) only needs to implement callback interfaces giving them a clearly defined contract. The IMessageCreator callback interface creates a message given a Session provided by the calling code in NmsTemplate. In order to allow for more complex usage of the provider messaging API, the callback ISessionCallback provides the user with the provider specific messaging Session and the callback IProducerCallback exposes a provider specific Session and MessageProducer pair. See Section 31., “Session and Producer Callback”.

Provider messaging APIs typically expose two types of send methods, one that takes delivery mode, priority, and time-to-live as quality of service (QOS) parameters and one that takes no QOS parameters which uses default values. Since there are many higher level send methods in NmsTemplate, the setting of the QOS parameters have been exposed as properties on the template class to avoid duplication in the number of send methods. Similarly, the timeout value for synchronous receive calls is set using the property ReceiveTimeout.

[Note]Note

Instances of the NmsTemplate class are thread-safe once configured. This is important because it means that you can configure a single instance of a NmsTemplate and then safely inject this shared reference into multiple collaborators. To be clear, the NmsTemplate is stateful, in that it maintains a reference to a ConnectionFactory, but this state is not conversational state.

31.2.2. Connections

The NmsTemplate requires a reference to a ConnectionFactory. The ConnectionFactory serves as the entry point for working with the provider's messaging API. It is used by the client application as a factory to create connections to the messaging server and encapsulates various configuration parameters, many of which are vendor specific such as SSL configuration options.

To create a ActivfeMQ ConnectionFactory define can create an object definition as shown

  <object id="nmsConnectionFactory" type="Apache.NMS.ActiveMQ.ConnectionFactory, Apache.NMS.ActiveMQ">
    <constructor-arg index="0" value="tcp://localhost:61616"/>
  </object>

EmsTemplate also requres a reference to a ConnectionFactory, however, it is not the 'native' TIBCO.EMS.ConnectionFactory. Instead the connection factory type is Spring.Messaging.Ems.Common.IConnectionFactory. See the documentation for TIBCO EMS supper for more information here.

31.2.3. Caching Messaging Resources

The standard API usage of NMS and other JMS inspired APIs involves creating many intermediate objects. To send a message the following 'API' walk is performed

IConnectionFactory->IConnection->ISession->IMessageProducer->Send

Between the ConnectionFactory and the Send operation there are three intermediate objects that are created and destroyed. To optimise the resource usage and increase performance two implementations of IConnectionFactory are provided.

31.2.3.1. SingleConnectionFactory

Spring.Messaging.Nms.Connections.SingleConnectionFactory will return the same connection on all calls to CreateConnection and ignore calls to Close.

31.2.3.2. CachingConnectionFactory

Spring.Messaging.Nms.Connections.CachingConnectionFactory extends the functionality of SingleConnectionFactory and adds the caching of Sessions, MessageProducers, and MessageConsumers.

The initial cache size is set to 1, use the property SessionCacheSize to increase the number of cached sessions. Note that the number of actual cached sessions will be more than that number as sessions are cached based on their acknowledgment mode, so there can be up to 4 cached session instances when SessionCacheSize is set to one, one for each AcknowledgementMode. MessageProducers and MessageConsumers are cached within their owning session and also take into account the unique properties of the producers and consumers when caching.

MessageProducers are cached based on their destination. MessageConsumers are cached based on a key composed of the destination, selector, noLocal delivery flag, and the durable subscription name (if creating durable consumers).

Here is an example configuration

  <object id="connectionFactory" type="Spring.Messaging.Nms.Connections.CachingConnectionFactory, Spring.Messaging.Nms">
    <property name="SessionCacheSize" value="10" />
    <property name="TargetConnectionFactory">
      <object type="Apache.NMS.ActiveMQ.ConnectionFactory, Apache.NMS.ActiveMQ">
        <constructor-arg index="0" value="tcp://localhost:61616"/>
      </object>
    </property>
  </object>

31.2.4. Dynamic Destination Management

In Java implementations of JMS, Connections and Destinations are 'administered objects' accessible though JNDI - a directory service much like ActiveDirectory. In .NET each vendor has selected a different approach to destination management. Some are JNDI inspired, allowing you to retrieve Connections and Destinations that were configured administratively. You can use these vendor specific APIs to perform dependency injection on references to JMS Destination objects in Spring's XML configuration file by creating am implementation of IObjectFactory or alternatively configuring the specific concrete class implementation for a messaging provider.

However, this approach of administered objects can be quite cumbersome if there are a large number of destinations in the application or if there are advanced destination management features unique to the messaging provider. Examples of such advanced destination management would be the creation of dynamic destinations or support for a hierarchical namespace of destinations. The NmsTemplate delegates the resolution of a destination name to a destination object by delegating to an implementation of the interface IDestinationResolver. DynamicDestinationResolver is the default implementation used by NmsTemplate and accommodates resolving dynamic destinations.

Quite often the destinations used in a messaging application are only known at runtime and therefore cannot be administratively created when the application is deployed. This is often because there is shared application logic between interacting system components that create destinations at runtime according to a well-known naming convention. Even though the creation of dynamic destinations are not part of the original JMS specification, most vendors have provided this functionality. Dynamic destinations are created with a name defined by the user which differentiates them from temporary destinations and are often not registered in a directory service. The API used to create dynamic destinations varies from provider to provider since the properties associated with the destination are vendor specific. However, a simple implementation choice that is sometimes made by vendors is to use the TopicSession method CreateTopic(string topicName) or the QueueSession method CreateQueue(string queueName) to create a new destination with default destination properties. Depending on the vendor implementation, DynamicDestinationResolver may then also create a physical destination instead of only resolving one.

The boolean property PubSubDomain is used to configure the NmsTemplate with knowledge of what messaging 'domain' is being used. By default the value of this property is false, indicating that the point-to-point domain, Queues, will be used. This property is infrequently used as the provider messaging APIs are now largely agnostic as to which messaging 'domain' is used, referring to 'Destinations' rather than 'Queues' or 'Topics'. However, this property does influence the behavior of dynamic destination resolution via implementations of the IDestinationResolver interface.

You can also configure the NmsTemplate with a default destination via the property DefaultDestination. The default destination will be used with send and receive operations that do not refer to a specific destination.

31.2.5. Message Listener Containers

One of the most common uses of JMS is to concurrently process messages delivered asynchronously. A message listener container is used to receive messages from a message queue and drive the IMessageListener that is injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an Message-Driven POCO (MDP) and a messaging provider, and takes care of registering to receive messages, resource acquisition and release, exception conversion and suchlike. This allows you as an application developer to write the (possibly complex) business logic associated with receiving a message (and possibly responding to it), and delegates boilerplate messaging infrastructure concerns to the framework.

A subclass of AbstractMessageListenerContainer is used to receive messages from JMS and drive the Message-Driven POCOs (MDPs) that are injected into it. There are one subclasses of AbstractMessageListenerContainer packaged with Spring - SimpleMessageListenerContainer. Additional subclasses, in particular to participate in distributed transactions (if the provider supports it), will be provided in future releases. SimpleMessageListenerContainer creates a fixed number of JMS sessions at startup and uses them throughout the lifespan of the container.

Creating and configuring a ActiveMQ MessageListener container is described in this section.

31.2.6. Transaction Management

Spring provides an implementation of the IPlatformTransactionManager interface for managing ActiveMQ messaging transactions. The class is NmsTransactionManager and it manages transactions for a single ConnectionFactory. This allows messaging applications to leverage the managed transaction features of Spring as described in Chapter 17, Transaction management. The NmsTransactionManager performs local resource transactions, binding a Connection/Session pair from the specified ConnectionFactory to the thread. NmsTemplate automatically detects such transactional resources and operates on them accordingly.

Using Spring's SingleConnectionFactory will result in a shared Connection, with each transaction having its own independent Session.

31.3. Sending a Message

The NmsTemplate contains three convenience methods to send a message. The methods are listed below.

  • void Send(IDestination destination, IMessageCreator messageCreator)

  • void Send(string destinationName, IMessageCreator messageCreator)

  • void Send(IMessageCreator messageCreator)

The method differ in how the destination is specified. In first case the JMS Destination object is specified directly. The second case specifies the destination using a string that is then resolved to a messaging Destination object using the IDestinationResolver associated with the template. The last method sends the message to the destination specified by NmsTemplate''s DefaultDestination property.

All methods take as an argument an instance of IMessageCreator which defines the API contract for you to create the JMS message. The interface is show below

public interface IMessageCreator {        
  IMessage CreateMessage(ISession session);
}

Intermediate Sessions and MessageProducers needed to send the message are managed by NmsTemplate. The session passed in to the method is never null. There is a similar set methods that use a delegate instead of the interface, which can be convenient when writing small implementation in .NET 2.0 using anonymous delegates. Larger, more complex implementations of the method 'CreateMessage' are better suited to an interface based implementation.

  • void SendWithDelegate(IDestination destination, MessageCreatorDelegate messageCreatorDelegate)

  • void SendWithDelegate(string destinationName, MessageCreatorDelegate messageCreatorDelegate)

  • void SendWithDelegate(MessageCreatorDelegate messageCreatorDelegate)

The declaration of the delegate is

public delegate IMessage MessageCreatorDelegate(ISession session);

The following class shows how to use the SendWithDelegate method with an anonymous delegate to create a MapMessage from the supplied Session object. The use of the anonymous delegate allows for very terse syntax and easy access to local variables. The NmsTemplate is constructed by passing a reference to a ConnectionFactory.

    public class SimplePublisher
    {
        private NmsTemplate template;
        
        public SimplePublisher()
        {
            template = new NmsTemplate(new ConnectionFactory("tcp://localhost:61616"));
        }
        
        public void Publish(string ticker, double price)
        {
            template.SendWithDelegate("APP.STOCK.MARKETDATA",
                          delegate(ISession session)
                          {
                              IMapMessage message = session.CreateMapMessage();
                              message.Body.SetString("TICKER", ticker);
                              message.Body.SetDouble("PRICE", price);
                              message.NMSPriority = MsgPriority.Low;
                              return message;
                          });
        }
    }


A zero argument constructor and ConnectionFactory property are also provided. Alternatively consider deriving from Spring's NmsGatewaySupport convenience base class which provides a ConnectionFactory property that will instantiate a NmsTemplate instance that is made available via the property NmsTemplate.

31.3.1. Using MessageConverters

In order to facilitate the sending of domain model objects, the NmsTemplate has various send methods that take a .NET object as an argument for a message's data content. The overloaded methods ConvertAndSend and ReceiveAndConvert in NmsTemplate delegate the conversion process to an instance of the IMessageConverter interface. This interface defines a simple contract to convert between .NET objects and JMS messages. The default implementation SimpleMessageConverter supports conversion between String and TextMessage, byte[] and BytesMesssage, and System.Collections.IDictionary and MapMessage. By using the converter, you and your application code can focus on the business object that is being sent or received via messaging and not be concerned with the details of how it is represented as a JMS message. There is also an XmlMessageConverter that converts objects to an XML string and vice-versa for sending via a TextMessage. Please refer to the API documentation and example application for more information on configuring an XmlMessageConverter.

The family of ConvertAndSend messages are similar to that of the Send method with the additional argument of type IMessagePostProcessor. These methods are listed below.

  • void ConvertAndSend(object message)

  • void ConvertAndSend(object message, IMessagePostProcessor postProcessor)

  • void ConvertAndSend(string destinationName, object message)

  • void ConvertAndSend(string destinationName, object message, IMessagePostProcessor postProcessor);

  • void ConvertAndSend(Destination destination, object message)

  • void ConvertAndSend(Destination destination, object message, IMessagePostProcessor postProcessor)

The example below uses the default message converter to send a Hashtable as a message to the destination "APP.STOCK".

public void PublishUsingDict(string ticker, double price)
{
  IDictionary marketData = new Hashtable();
  marketData.Add("TICKER", ticker);
  marketData.Add("PRICE", price);
  template.ConvertAndSend("APP.STOCK.MARKETDATA", marketData);
}

To accommodate the setting of message's properties, headers, and body that can not be generally encapsulated inside a converter class, the IMessageConverterPostProcessor interface gives you access to the message after it has been converted but before it is sent. The example below demonstrates how to modify a message header and a property after a Hashtable is converted to a message using the IMessagePostProcessor. The methods ConvertAndSendUsingDelegate allow for the use of a delegate to perform message post processing. This family of methods is listed below

  • void ConvertAndSendWithDelegate(object message, MessagePostProcessorDelegate postProcessor)

  • void ConvertAndSendWithDelegate(IDestination destination, object message, MessagePostProcessorDelegate postProcessor)

  • void ConvertAndSendWithDelegate(string destinationName, object message, MessagePostProcessorDelegate postProcessor)

The declaration of the delegate is

public delegate IMessage MessagePostProcessorDelegate(IMessage message);

The following code shows this in action.

public void PublishUsingDict(string ticker, double price)
{
  IDictionary marketData = new Hashtable();
  marketData.Add("TICKER", ticker);
  marketData.Add("PRICE", price);
  template.ConvertAndSendWithDelegate("APP.STOCK.MARKETDATA", marketData, 
           delegate(IMessage message)
           { 
             message.NMSPriority = MsgPriority.Low;
             message.NMSCorrelationID = new Guid().ToString();
             return message;
           });
}

31.. Session and Producer Callback

While the send operations cover many common usage scenarios, there are cases when you want to perform multiple operations on a JMS Session or MessageProducer. The SessionCallback and ProducerCallback expose the Session and Session / MessageProducer pair respectfully. The Execute() methods on NmsTemplate execute these callback methods.

  • public object Execute(IProducerCallback action)

  • public object Execute(ProducerDelegate action)

  • public object Execute(ISessionCallback action)

  • public object Execute(SessionDelegate action)

Where ISessionCallback and IProducerCallback are

public interface IProducerCallback
{
    object DoInNms(ISession session, IMessageProducer producer);
}

and

public interface ISessionCallback
{
    object DoInNms(ISession session);
}

The delegate signatures are listed below and mirror the interface method signature

public delegate object SessionDelegate(ISession session);

public delegate object ProducerDelegate(ISession session, IMessageProducer producer);

31.5. Receiving a message

31.5.1. Synchronous Reception

While messaging middleware is typically associated with asynchronous processing, it is possible to consume messages synchronously. The overloaded Receive(..) methods on NmsTemplate provide this functionality. During a synchronous receive, the calling thread blocks until a message becomes available. This can be a dangerous operation since the calling thread can potentially be blocked indefinitely. The property ReceiveTimeout on NmsTemplate specifies how long the receiver should wait before giving up waiting for a message.

The Receive methods are listed below

  • public Message Receive()

  • public Message Receive(Destination destination)

  • public Message Receive(string destinationName)

  • public Message ReceiveSelected(string messageSelector)

  • public Message ReceiveSelected(string destinationName, string messageSelector)

  • public Message ReceiveSelected(Destination destination, string messageSelector)

The Receive method without arguments will use the DefaultDestination. The ReceiveSelected methods apply the provided message selector string to the MessageConsumer that is created.

The ReceiveAndConvert methods apply the template's message converter when receiving a message. The message converter to use is set using the property MessageConverter and is the SimpleMessageConverter implementation by default. These methods are listed below.

  • public object ReceiveAndConvert()

  • public object ReceiveAndConvert(Destination destination)

  • public object ReceiveAndConvert(string destinationName)

  • public object ReceiveSelectedAndConvert(string messageSelector)

  • public object ReceiveSelectedAndConvert(string destinationName, string messageSelector)

  • public object ReceiveSelectedAndConvert(Destination destination, string messageSelector)

31.5.2. Asynchronous Reception

Asynchronous reception of messages occurs by the messaging provider invoking a callback function. This is commonly an interface such as the IMessageListener interface shown below, taken from the TIBCO EMS provider.

public interface IMessageListener
{
      void OnMessage(Message message);
}

Other vendors may provide a delegate based version of this callback or even both a delegate and interface options. Apache ActiveMQ supports the use of delegates for message reception callbacks. As a programming convenience in Spring.Messaging.Nms.Core is an interface IMessageListener that can be used with NMS.

Below is a simple implementation of the IMessageListener interface that processes a message.

using Spring.Messaging.Nms.Core;
using Apache.NMS;
using Common.Logging;

namespace MyApp
{
  public class SimpleMessageListener : IMessageListener
  {
        private static readonly ILog LOG = LogManager.GetLogger(typeof(SimpleMessageListener));

        private int messageCount;

        public int MessageCount
        {
            get { return messageCount; }
        }

        public void OnMessage(IMessage message)
        {           
            messageCount++;
            LOG.Debug("Message listener count = " + messageCount);
            ITextMessage textMessage = message as ITextMessage;
            if (textMessage != null)
            {
                LOG.Info("Message Text = " + textMessage.Text);
            } else
            {
                LOG.Warn("Can not process message of type " message.GetType());
            }
        }
}

Once you've implemented your message listener, it's time to create a message listener container.

You register you listener with a message listener container that specifies various messaging configuration parameters, such as the ConnectionFactory, and the number of concurrent consumers to create. There is an abstract base class for message listener containers, AbstractMessageListenerContainer, and one concrete implementation, SimpleMessageListenerContainer. SimpleMessageListenerContainer creates a fixed number of JMS Sessions/MessageConsumer pairs as set by the property ConcurrentConsumers. The default value of ConcurrentConsumers is one. Here is a sample configuration that uses the the custom schema provided in Spring.NET to more reasily configure MessageListenerContainers.

<objects xmlns="http://www.springframework.net"
         xmlns:nms="http://www.springframework.net/nms">

  <object id="ActiveMqConnectionFactory" type="Apache.NMS.ActiveMQ.ConnectionFactory, Apache.NMS.ActiveMQ">
    <constructor-arg index="0" value="tcp://localhost:61616"/>
  </object>

  <object id="ConnectionFactory" type="Spring.Messaging.Nms.Connections.CachingConnectionFactory, Spring.Messaging.Nms">
    <constructor-arg index=0" ref="ActiveMqConnectionFactory"/>
    <property name="SessionCacheSize" value="10"/>
  </object>

  <object id="MyMessageListener" type="MyApp.SimpleMessageListener, MyApp"/>

  <nms:listener-container connection-factory="ConnectionFactory" concurrency="10">
    <nms:listener ref="MyMessageListener" destination="APP.STOCK.REQUEST" />
  </nms:listener-container>

</objects>

The above configuration will create 10 threads that process messages off of the queue named "APP.STOCK.REQUEST". The threads are those owned by the messaging provider as a result of creating a MessageConsumer. Other important properties are ClientID, used to set the ClientID of the Connection and MessageSelector to specify the 'sql-like' message selector string. Durable subscriptions are supported via the properties SubscriptionDurable and DurableSubscriptionName. You may also register an exception listener using the property ExceptionListener.

Exceptions that are thrown during message processing can be passed to an implementation of IExceptionHandler and registered with the container via the property ExceptionListener. The registered IExceptionHandler will be invoked if the exception is of the type NMSException (or the equivalent root exception type for other providers). The SimpleMessageListenerContainer will logs the exception at error level and not propagate the exception to the provider. All handling of acknowledgement and/or transactions is done by the listener container. You can override the method HandleListenerException to change this behavior.

Please refer to the Spring SDK documentation for additional description of the features and properties of SimpleMessageListenerContainer.

31.5.3. The ISessionAwareMessageListener interface

The ISessionAwareMessageListener interface is a Spring-specific interface that provides a similar contract to the messaging provider's IMessageListener interface or Listener delegate/event, but also provides the message handling method with access to the Session from which the Message was received.

public interface ISessionAwareMessageListener
{        
   void OnMessage(IMessage message, ISession session);
}

You can also choose to implement this interface and register it with the message listener container

31.5.4. MessageListenerAdapater

The MessageListenerAdapter class is the final component in Spring's asynchronous messaging support: in a nutshell, it allows you to expose almost any class to be invoked as a messaging callback (there are of course some constraints).

Consider the following interface definition. Notice that although the interface extends neither the IMessageListener nor ISessionAwareMessageListener interfaces, it can still be used as a Message-Driven POCOs (MDP) via the use of the MessageListenerAdapter class. Notice also how the various message handling methods are strongly typed according to the contents of the various Message types that they can receive and handle.

public interface MessageHandler {

    void HandleMessage(string message);

    void HandleMessage(Hashtable message);

    void HandleMessage(byte[] message);

}

and a class that implements this interface...

public class DefaultMessageHandler : IMessageHandler {
    // stub implementations elided for bevity...
}

In particular, note how the above implementation of the IMessageHandler interface (the above DefaultMessageHandler class) has no messaging provider API dependencies at all. It truly is a POCO that we will make into an MDP via the following configuration.

<object id="MessagleHandler" type="MyApp.DefaultMessageHandler, MyApp"/>

<object id="MessageListenerAdapter" type="Spring.Messaging.Nms.Listener.Adapter.MessageListenerAdapter, Spring.Messaging.Nms">
  <property name="HandlerObject" ref="MessagleHandler"/>
</object>

<object id="MessageListenerContainer" type="Spring.Messaging.Nms.Listener.SimpleMessageListenerContainer, Spring.Messaging.Nms">
  <property name="ConnectionFactory" ref="ConnectionFactory"/>
  <property name="DestinationName" value="APP.REQUEST"/>
  <property name="MessageListener" ref="MessageListenerAdapter"/>
</object>

The previous examples relies on the fact that the default IMessageConverter implementation of the MessageListenerAdapter is SimpleMessageConverter that can convert from messages to strings, byte[], and hashtables and object from a ITextMessage, IBytesMessage, IMapMessage, and IObjectMessage respectfully.

Below is an example of another MDP that can only handle the receiving of NMS ITextMessage messages. Notice how the message handling method is actually called 'Receive' (the name of the message handling method in a MessageListenerAdapter defaults to 'HandleMessage'), but it is configurable (as you will see below). Notice also how the 'Receive(..)' method is strongly typed to receive and respond only to NMS ITextMessage messages.

public interface TextMessageHandler {

    void Receive(ITextMessage message);
}
public class TextMessageHandler implements ITextMessageHandler {
    // implementation elided for clarity...
}

The configuration of the attendant MessageListenerAdapter would look like this

<object id="MessagleHandler" type="MyApp.DefaultMessageHandler, MyApp"/>

<object id="MessageListenerAdapter" type="Spring.Messaging.Nms.Listener.Adapter.MessageListenerAdapter, Spring.Messaging.Nms">
    <property name="HandlerObject" ref="TextMessagleHandler"/>
    <property name="DefaultHandlerMethod" value="Receive"/>
    <!-- we don't want automatic message context extraction -->
    <property name="MessageConverter">
        <null/>
    </property>
</object>

Please note that if the above 'MessageListener' receives a Message of a type other than ITextMessage, a ListenerExecutionFailedException will be thrown (and subsequently handled by the container by logging the exception).

If your IMessageConverter implementation will return multiple object types, overloading the handler method is perfectly acceptable, the most specific matching method will be used. A method with an object signature would be consider a 'catch-all' method of last resort. For example, you can have an handler interface as shown below.

public interface IMyHandler
{
   void DoWork(string text);
   void DoWork(OrderRequest orderRequest);
   void DoWork(InvoiceRequest invoiceRequest);
   void DoWork(object obj);
}

Another of the capabilities of the MessageListenerAdapter class is the ability to automatically send back a response Message if a handler method returns a non-void value. The adapter's message converter will be used to convert the methods return value to a message. The resulting message will then be sent to the Destination defined in the JMS Reply-To property of the original Message (if one exists) , or the default Destination set on the MessageListenerAdapter (if one has been configured). If no Destination is found then an InvalidDestinationException will be thrown (and please note that this exception will not be swallowed and will propagate up the call stack).

An interface that is typical when used with a message converter that supports multiple object types and has return values is shown below.

public interface IMyHandler
{
   string DoWork(string text);
   OrderResponse DoWork(OrderRequest orderRequest);
   InvoiceResponse DoWork(InvoiceRequest invoiceRequest);
   void DoWork(object obj);
}

31.5.5. Processing messages within a messaging transaction

Invoking a message listener within a transaction only requires reconfiguration of the listener container. Local message transactions can be activated by setting the property SessionAcknowledgeMode which for NMS is of the enum type AcknowledgementMode, to AcknowledgementMode.Transactional. Each message listener invocation will then operate within an active messaging transaction, with message reception rolled back in case of listener execution failure.

Sending a response message (via ISessionAwareMessageListener) will be part of the same local transaction, but any other resource operations (such as database access) will operate independently. This usually requires duplicate message detection in the listener implementation, covering the case where database processing has committed but message processing failed to commit. See the discussion on the ActiveMQ web site here for more information combining local database and messaging transactions.

31.5.6. Messaging Namespace support

To use the NMS namespace elements you will need to reference the NMS schema. When using TIBCO EMS you should refer to the TIBCO EMS Schema. For information on how to set this up refer to Section B.2.7, “The nms messaging schema”. The namespace consists of one top level elements: <listener-container/> which can contain one or more <listener/> child elements. Here is an example of a basic configuration for two listeners.

<nms:listener-container>

    <nms:listener destination="queue.orders" ref="OrderService" method="PlaceOrder"/>

    <nms:listener destination="queue.confirmations" ref="ConfirmationLogger" method="Log"/>

</nms:listener-container>

The example above is equivalent to creating two distinct listener container object definitions and two distinct MessageListenerAdapter object definitions as demonstrated in the section entitled Section 31.5.4, “MessageListenerAdapater”. In addition to the attributes shown above, the listener element may contain several optional ones. The following table describes all available attributes:

Table 31.1. Attributes of the NMS <listener> element
AttributeDescription
id

A object name for the hosting listener container. If not specified, a object name will be automatically generated.

destination (required)

The destination name for this listener, resolved through the IDestinationResolver strategy.

ref (required)

The object name of the handler object.

method

The name of the handler method to invoke. If the ref points to a IMessageListener or Spring ISessionAwareMessageListener, this attribute may be omitted.

response-destination

The name of the default response destination to send response messages to. This will be applied in case of a request message that does not carry a "NMSReplyTo" field. The type of this destination will be determined by the listener-container's "destination-type" attribute. Note: This only applies to a listener method with a return value, for which each result object will be converted into a response message.

subscription

The name of the durable subscription, if any.

selector

An optional message selector for this listener.

pubsub-domain

An optional boolean value. Set to true for the publish-subscribe domain (Topics) or false (the default) for point-to-point domain (Queues). This is useful when using the default implementation for destination resolvers.


The <listener-container/> element also accepts several optional attributes. This allows for customization of the various strategies (for example, DestinationResolver) as well as basic messaging settings and resource references. Using these attributes, it is possible to define highly-customized listener containers while still benefiting from the convenience of the namespace.

<nms:listener-container connection-factory="MyConnectionFactory"
                        destination-resolver="MyDestinationResolver"                        
                        concurrency="10">

    <nms:listener destination="queue.orders" ref="OrderService" method="PlaceOrder"/>

    <nms:listener destination="queue.confirmations" ref="ConfirmationLogger" method="Log"/>

</nms:listener-container>

The following table describes all available attributes. Consult the class-level SDK documentation of the AbstractMessageListenerContainer and its subclass SimpleMessageListenerContainer for more detail on the individual properties.

Table 31.2. Attributes of the NMS <listener-container> element
AttributeDescription
connection-factory

A reference to the NMS ConnectionFactory object (the default object name is 'ConnectionFactory').

destination-resolver

A reference to the IDestinationResolver strategy for resolving JMS Destinations.

message-converter

A reference to the IMessageConverter strategy for converting NMS Messages to listener method arguments. Default is a SimpleMessageConverter.

destination-type

The NMS destination type for this listener: queue, topic or durableTopic. The default is queue.

client-id

The NMS client id for this listener container. Needs to be specified when using durable subscriptions.

acknowledge

The native NMS acknowledge mode: auto, client, dups-ok or transacted. A value of transacted activates a locally transacted Session. As an alternative, specify the transaction-manager attribute described below. Default is auto.

concurrency

The number of concurrent sessions/consumers to start for each listener. Default is 1; keep concurrency limited to 1 in case of a topic listener or if queue ordering is important; consider raising it for general queues.

recovery-interval

The time interval between connection recovery attempts. The default is 5 seconds. Specify as a TimeSpan value using Spring's TimeSpanConverter (e.g. 10s, 10m, 3h, etc)

max-recovery-time

The maximum time try reconnection attempts. The default is 10 minutes. Specify as a TimeSpan value using Spring's TimeSpanConverter (e.g. 10s, 10m, 3h, etc)

auto-startupSet whether to automatically start the listeners after initialization. Default is true, optionally set to false.
error-handlerA reference to a IErrorHandler that will handle any uncaught exceptions other than those of the type NMSException (in the case of ActiveMQ or EMSException int he case of TIBCO EMS) that may occur during the execution of the message listener. By default no ErrorHandler is registered and that error-level logging is the default behavior.
exception-listenerA reference to an Spring.Messaging.Nms.Core.IExceptionListener or TIBCO.EMS.IExceptionListener as appropriate. Is invokved in case of a NMSException or EMSException.