Chapter 33. Message Oriented Middleware - MSMQ

33.1. Introduction

The goals of Spring's MSMQ 3.0 messaging support is to raise the level of abstraction when writing MSMQ applications. The System.Messaging API is a low-level API that provides the basis for creating a messaging application. However, 'Out-of-the-box', System.Messaging leaves the act of creating sophisticated multi-threaded messaging servers and clients as an infrastructure activity for the developer. Spring fills this gap by proving easy to use helper classes that makes creating an enterprise messaging application easy. These helper classes take into account the nuances of the System.Messaging API, such as its lack of thread-safety in many cases, the handling of so-called 'poison messages' (messages that are endlessly redelivered due to an unrecoverable exception during message processing), and combining database transactions with message transactions. Other goals of Spring's MSMQ messaging support are to support messaging best practices, in particular encouraging a clean architectural layering that separates the messaging middleware specifics from the core business processing.

Spring's approach to distributed computing has always been to promote a plain old CLR object approach or a POCO programming model. In this approach plain CLR objects are those that are devoid of any reference to a particular middleware technology. Spring provides the 'adapter' classes that converts between the middleware world, in this case MSMQ, and the oo-world of your business processing. This is done through the use of Spring's MessageListenerAdapter class and IMessageConverters.

The namespace Spring.Messaging provides the core functionality for messaging. It contains the class MessageQueueTemplate that simplifies the use of System.Messaging.MessageQueue by handling the lack of thread-safety in most of System.Messaging.MessageQueue's methods (for example Send). A single instance of MessageQueueTemplate can be used throughout your application and Spring will ensure that a different instance of a MessageQueue class is used per thread when using MessageQueueTemplate's methods. This per-thread instance of a System.Messaging.MessageQueue is also available via its property MessageQueue. The MessageQueueTemplate class is also aware of the presence of either an 'ambient' System.Transaction's transaction or a local System.Messaging.MessageQueueTransaction. As such if you use MessageQueueTemplate's send and receive methods, unlike with plain use of System.Messaging.MessageQueue, you do not need to keep track of this information yourself and call the correct overloaded System.Messaging.MessageQueue method for a specific transaction environment. When using a System.Messaging.MessageQueueTransaction this would usually require you as a developer to come up with your own mechanism for passing around a MessageQueueTransaction to multiple classes and layers in your application. MessageQueueTemplate manages this for you, so you don't have to do so yourself. These resource management and transaction features of MessageQueueTemplate are quite analogous to the transactional features of Spring's AdoTemplate in case you are already familiar with that functionality.

For asynchronous reception Spring provides several multi-threaded message listener containers. You can pick and configure the container that matches your message transactional processing needs and configure poison-message handling policies. The message listener container leverages Spring's support for managing transactions. Both DTC, local messaging transactions, and local database transactions are supported. In particular, you can easily coordinate the commit and rollback of a local MessageQueueTransaction and a local database transaction when they are used together.

From a programming perspective, Spring's MSMQ support involves you configuring message listener containers and writing a callback function for message processing. On the sending side, it involves you learning how to use MessageQueueTemplate. In both cases you will quite likely want to take advantage of using MessageListenerConverters so you can better structure the translation from the System.Messaging.Message data structure to your business objects. After the initial learning hurdle, you should find that you will be much more productive leveraging Spring's helper classes to write enterprise MSMQ applications than rolling your own infrastructure. Feedback and new feature requests are always welcome.

The Spring.MsmqQuickstart application located in the examples directory of the distribution shows this functionality in action.

33.2. A quick tour for the impatient

Here is a quick example of how to use Spring's MSMQ support to create a client that sends a message and a multi-threaded server application that receives the message. (The client code could also be used as-is in a multi-threaded environment but this is not demonstrated).

On the client side you create an instance of the MessageQueueTemplate class and configure it to use a MessageQueue. This can be done programmatically but it is common to use dependency injection and Spring's XML configuration file to configure your client class as shown below.

  <object id='questionTxQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'>
    <property name='Path' value='.\Private$\questionTxQueue'/>
    <property name='MessageReadPropertyFilterSetAll' value='true'/>
  </object>

  <object id="messageQueueTemplate" type="Spring.Messaging.Core.MessageQueueTemplate, Spring.Messaging">
    <property name="MessageQueueObjectName" value="questionTxQueue"/>
  </object>

  <!-- Class you write -->
  <object id="questionService" type="MyNamespace.QuestionService, MyAssembly">
    <property name="MessageQueueTemplate" ref="messageQueueTemplate"/>
  <object>

The MessageQueue object is created via an instance of MessageQueueFactoryObject and the MessageQueueTemplate refers to this factory object by name and not by reference. The SimpleSender class looks like this

public class QuestionService : IQuestionService
{
  private MessageQueueTemplate messageQueueTemplate;

  public MessageQueueTemplate { 
    get { return messageQueueTemplate; }
    set { messageQueueTemplate = value; }
  }

  public void SendQuestion(string question)
  {
    MessageQueueTemplate.ConvertAndSend(question);
  }
}

This class can be shared across multiple threads and the MessageQueueTemplate will take care of managing thread local access to a System.Messaging.MessageQueue as well as any System.Messaging.IMessageFormatter instances.

Furthermore, since this is a transactional queue (only the name gives it away), the message will be sent using a single local messaging transaction. The conversion from the string to the underling message is managed by an instance of the IMessageConverter class. By default an implementation that uses an XmlMessageFormatter with a TargetType of System.String is used. You can configure the MessageQueueTemplate to use other IMessageConveter implementations that do conversions above and beyond what the 'stock' IMessageFormatters do. See the section on MessageConverters for more details.

On the receiving side we would like to consume the messages transactionally from the queue. Since no other database operations are being performed in our server side processing, we select the TransactionMessageListenerContainer and configure it to use the MessageQueueTransactionManager. The MessageQueueTransactionManager an implementation of Spring's IPlatformTransactionManager abstraction that provides a uniform API on top of various transaction manager (ADO.NET,NHibernate, MSMQ, etc). Spring's MessageQueueTransactionManager is responsible for createing, committing, and rolling back a MSMQ MessageQueueTransaction.

While you can create the message listener container programmatically, we will show the declarative configuration approach below

  <!-- Queue to receive from -->
  <object id='questionTxQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'>
    <property name='Path' value='.\Private$\questionTxQueue'/>
    <property name='MessageReadPropertyFilterSetAll' value='true'/>
  </object>

  <!-- MSMQ Transaction Manager -->
  <object id="messageQueueTransactionManager" type="Spring.Messaging.Core.MessageQueueTransactionManager, Spring.Messaging"/>

  <!-- Message Listener Container that uses MSMQ transactional for receives -->
  <object id="transactionalMessageListenerContainer" type="Spring.Messaging.Listener.TransactionalMessageListenerContainer, Spring.Messaging">
    <property name="MessageQueueObjectName" value="questionTxQueue"/>
    <property name="PlatformTransactionManager" ref="messageQueueTransactionManager"/>
    <property name="MaxConcurrentListeners" value="10"/>
    <property name="MessageListener" ref="messageListenerAdapter"/>
  </object>
 
  <!-- Adapter to call a POCO as a messaging callback -->
  <object id="messageListenerAdapter" type="Spring.Messaging.Listener.MessageListenerAdapter, Spring.Messaging">
    <property name="HandlerObject" ref="questionHandler"/>
  </object>

  <!-- The POCO class that you write -->
  <object id="questionHandler" type="MyNamespace.QuestionHandler, MyAssembly"/>
  

We have specified the queue to listen, that we want to consume the messages transactionally, process messages from the queue using 10 threads, and that our plain object that will handle the business processing is of the type QuestionHandler. The only class you need to write, QuestionHandler, looks like

public class QuestionHandler : IQuestionHandler
{
  public void HandleObject(string question)
  {
     // perform message processing here

     Console.WriteLine("Received question: " + question);

     // use an instance of MessageQueueTemplate and have other MSQM send operations
     // partake in the same local message transaction used to receive
  }

}

That is general idea. You write the sender class using MessageQueueTemplate and the consumer class which does not refer to any messaging specific class. The rest is configuration of Spring provided helper classes.

Note that if the HandleObject method has returned a string value a reply message would be sent to a response queue. The response queue would be taken from the Message's own ResponseQueue property or can be specified explicitly using MessageListenerAdapter's DefaultResponseQueueName property.

If an exception is thrown inside the QuestionHandler, then the MSMQ transaction is rolled back, putting the message back on the queue for redelivery. If the exception is not due to a transient error in the system, but a logical processing exception, then one would get endless redelivery of the message - clearly not a desirable situation. These messages are so called 'poison messages' and a strategy needs to be developed to deal with them. This is left as a development task if you when using the System.Messaging APIs but Spring provides a strategy for handling poison messages, both for DTC based message reception as well as for local messaging transactions.

In the last part this 'quick tour' we will configure the message listener container to handle poison messages. This is done by creating an instance of SendToQueueExceptionHandler and setting the property MaxRetry to be the number of exceptions or retry attempts we are willing to tolerate before taking corrective actions. In this case, the corrective action is to send the message to another queue. We can then create other message listener containers to read from those queues and handle the messages appropriately or perhaps you will avoid automated processing of these messages and take manual corrective actions.

  <!-- The 'error' queue to send poison messages -->
  <object id='errorQuestionTxQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'>
    <property name='Path' value='.\Private$\errorQuestionTxQueue'/>
    <property name='MessageReadPropertyFilterSetAll' value='true'/>
  </object>

  <!-- Message Listener Container that uses MSMQ transactional for receives -->
  <object id="transactionalMessageListenerContainer" type="Spring.Messaging.Listener.TransactionalMessageListenerContainer, Spring.Messaging">

    <!-- as before but adding -->
    <property name="MessageTransactionExceptionHandler" ref="messageTransactionExceptionHandler"/>
  </object>

  <!-- Poison message handling policy -->
  <object id="messageTransactionExceptionHandler" type="Spring.Messaging.Listener.SendToQueueExceptionHandler, Spring.Messaging">
    <property name="MaxRetry" value="5"/>
    <property name="MessageQueueObjectName" value="errorQuestionTxQueue"/>
  </object>

In the event of an exception while processing the message, the message transaction will be rolled back (putting the message back on the queue questionTxQueue for redelivery). If the same message causes an exception in processing 5 times ,then it will be sent transactionally to the errorQuestionTxQueue and the message transaction will commit (removing it from the queue questionTxQueue). You can also specify that certain exceptions should commit the transaction (remove from the queue) but this is not shown here ,see below for more informatio non this functionality The SendToQueueExceptionHandler implements the interface IMessageTransactionExceptionHandler (discussed below) so you can write your own implementations should the provided ones not meet your needs.

That's the quick tour folks. Hopefully you got a general feel for how things work, what requires configuration, and what is the code you need to write. The following sections describe each of Spring's helper classes in more detail. The sample application that ships with Spring is also a good place to get started.

33.3. Using Spring MSMQ

33.3.1. MessageQueueTemplate

The MessageQueueTemplate is used for synchronously sending and receiving messages. A single instance can be shared across multiple threads, unlike the standard System.Messaging.MessageQueue class. (One less resource management issue to worry about!) A thread-local instance of the MessageQueue class is available via MessageQueueTemplate's property MessageQueue. A MessageQueueTemplate is created by passing a reference to the name of a MessageQueueFactoryObject, you can think of it as a friendly name for your MessagingQueue and the recipe of how to create an instance of it. See the following section on MessageQueueFactoryObject for more information.

The MessageQueueTemplate also provides several convenience methods for sending and receiving messages. A family of overloaded ConvertAndSend and ReceiveAndConvert methods allow you to send and receive an object. The default message queue to send and receive from is specified using the MessageQueueTemplate's property MessageQueueObjectName. The responsibility of converting the object to a Message and vice versa is given to the template's associated IMessageConverter implementation. This can be set using the property MessageConverter. The default implementation, XmlMessageConverter, uses an XmlMessageFormatter with its TargetType set to System.String. Note that System.Messaging.IMessageFormatter classes are also not thread safe, so MessageQueueTemplate ensures that thread-local instances of IMessageConverter are used (as they generally wrap IMessageFormatter's that are not thread-safe).

You can use the MessageQueueTemplate to send messages to other MessageQueues by specifying their queue 'object name', the name of the MessageQueueFactoryObject.

The family of overloaded ConvertAndSend and ReceiveAndConvert methods are shown below

void ConvertAndSend(object obj);

void ConvertAndSend(object obj, MessagePostProcessorDelegate messagePostProcessorDelegate);

void ConvertAndSend(string messageQueueObjectName, object message);

void ConvertAndSend(string messageQueueObjectName, object obj, MessagePostProcessorDelegate messagePostProcessorDelegate);

object ReceiveAndConvert();

object ReceiveAndConvert(string messageQueueObjectName);

The transactional settings of the underlying overloaded System.Messaging.MessageQueue Send method that are used are based on the following algorithm.

  1. If the message queue is transactional and there is an ambient MessageQueueTransaction in thread local storage (put there via the use of Spring's MessageQueueTransactionManager or TransactionalMessageListenerContainer), the message will be sent transactionally using the MessageQueueTransaction object in thread local storage.

    [Note]Note

    This lets you group together multiple messaging operations within the same transaction without having to explicitly pass around the MessageQueueTransaction object.

  2. f the message queue is transactional but there is no ambient MessageQueueTransaction, then a single message transaction is created on each messaging operation. (MessageQueueTransactionType = Single).

  3. If there is an ambient System.Transactions transaction then that transaction will be used (MessageQueueTransactionType = Automatic).

  4. If the queue is not transactional, then a non-transactional send (MessageQueueTransactionType = None) is used.

The delegate MessagePostProcessorDelegate has the following signature

public delegate Message MessagePostProcessorDelegate(Message message);

This lets you modify the message after it has been converted from and object to a message using the IMessageConverter but before it is sent. This is useful for setting Message properties (e.g. CorrelationId, AppSpecific, TimeToReachQueue). Using anonymous delegates in .NET 2.0 makes this a very succinct coding task. If you have elaborate properties that need to be set, perhaps creating a custom IMessageConverter would be appropriate.

Overloaded Send and Receive operations that use the algorithm listed above to set transactional delivery options are also available. These are listed below

Message Receive();

Message Receive(string messageQueueObjectName);

void Send(Message message);

void Send(string messageQueueObjectName, Message message);

void Send(MessageQueue messageQueue, Message message);

Note that in the last Send method that takes a MessageQueue instance, it is the callers responsibility to ensure that this instance is not accessed from multiple threads. This Send method is commonly used when getting the MessageQueue from the ResponseQueue property of a Message during an asynchronous receive process. The receive timeout of the Receive operations is set using the ReceiveTimeout property of MessageQueueTemplate. The default value is MessageQueue.InfiniteTimeout (which is actually ~3 months).

The XML configuration snippit for defining a MessageQueueTemplate is shown in the previous section and also is located in the MSMQ quickstart application configuraiton file Messaging.xml

33.3.2. MessageQueueFactoryObject

The MessageQueueFactoryObject is responsible for creating MessageQueue instances. You configure the factory with some basic information, namely the constructor parameters you are familiar with already when creating a standard MessageQueue instance, and then setting MessageQueue properties, such a Label etc. Some configuration tasks of a MessageQueue involve calling methods, for example to set which properties of the message to read. These available as properties to set on the MessageQueueFactoryObject. An example declarative configuration is shown below

  <object id='testqueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'>
    <!-- propeties passed to the MessageQueue constructor -->
    <property name='Path' value='.\Private$\testqueue'/>
    <property name='DenySharedReceive' value='true'/>
    <property name='AccessMode' value='Receive'/>
    <property name='EnableCache' value='true'/>
    <!-- properties that call configuration methods on the MessageQueue -->
    <property name='MessageReadPropertyFilterSetAll' value='true'/>
    <property name='ProductTemplate'>
      <object>
        <property name='Label' value='MyLabel'/>
        <!-- other MessageQueue properties can be set here -->
      </object>
    </property>
  </object>

Whenever an object reference is made to 'testqueue' an new instance of the MessageQueue class is created. This Spring's so-called 'prototype' model, which differs from 'singleton' mode. In the singleton creation mode whenever an object reference is made to a 'testqueue' the same MessageQueue instance would be used. So that a new instance can be retrieved based on need, the message listener containers take as an argument the name of the MessageQueueFactoryObject and not a reference. (i.e. use of 'value' instead of 'ref' in the XML).

[Note]Note

The MessageQueueFactoryObject class is an ideal candidate for use of a custom namespace. This will be provided in the future. This will allow you to use VS.NET IntelliSense to configure this commonly used object. An example of the potential syntax is shown below

<mq:messageQueue id="testqueue" path=".\Private$\testqueue" MessageReadPropertyFilterSetAll="true">
  <mq:properties label="MyLabel"/>
</mq:messageQueue>

33.3.3. MessageQueue and IMessageConverter resource management

MessageQueues and IMessageFormatters (commonly used in IMessageConverter implementations) are not thread-safe. For example, only the following methods on MessageQueue are thread-safe, BeginPeek, BeginReceive, EndPeek, EndReceive, GetAllMessages, Peek, and Receive.

To isolate the creation logic of these classes, the factory interface IMessageQueueFactory is used. The interface is shown below

    public interface IMessageQueueFactory
    {
        MessageQueue CreateMessageQueue(string messageQueueObjectName);

        IMessageConverter CreateMessageConverter(string messageConverterObjectName);
    }

A provided implementation, DefaultMessageQueueFactory will create an instance of each class per-thread. It delegates the creation of the MessageQueue instance to the Spring container. The argument, messageConverterObjectName, must be the id/name of a MessageQueueFactoryObject defined in the Spring container.

DefaultMessageQueueFactory leverages Spring's local thread storage support so it will work correctly in stand alone and web applications.

You can use the DefaultMessageQueueFactory independent of the rest of Spring's MSMQ support should you need only the functionality it offers. MessageQueueTemplate and the listener containers create an instance of DefaultMessageQueueFactory by default. Should you want to share the same instance across these two classes, or provide your own custom implementation, use the property MessageQueueFactory on either MessageQueueTemplate or the message listener classe.s

33.3.4. Message Listener Containers

One of the most common uses of MSMQ is to concurrently process messages delivered asynchronously. This support is provided in Spring by message listener containers. A message listener container is the intermediary between an IMessageListener and a MessageQueue. (Note, message listener containers are conceptually different than Spring's Inversion of Control container, though it integrates and leverages the IoC container.) The message listener container takes care of registering to receive messages, participating in transactions, resource acquisition and release, exception conversion and suchlike. This allows you as an application developer to write the (possibly complex) business logic associated with receiving a message (and possibly responding to it), and delegate boilerplate MSMQ infrastructure concerns to the framework.

A subclass of AbstractMessageListenerContainer is used to receive messages from a MessageQueue. Which subclass you pick depends on your transaction processing requirements. The following subclasses are available in the namespace Spring.Messaging.Listener

  • NonTransactionalMessageListenerContainer - does not surround the receive operation with a transaction

  • TransactionalMessageListenerContainer - surrounds the receive operation with local (non-DTC) based transaction(s).

  • DistributedTxMessageListenerContainer - surrounds the receive operation with a distributed (DTC) transaction

Each of these containers use an implementation in which is based on Peeking for messages on a MessageQueue. Peeking is the only resource efficient approach that can be used in order to have MessageQueue receipt in conjunction with transactions, either local MSMQ transactions, local ADO.NET based transactions, or DTC transactions. Each container can specify the number of threads that will be created for processing messages after the Peek occurs via the property MaxConcurrentListeners. Each processing thread will continue to listen for messages up until the timeout value specified by ListenerTimeLimit or until there are no more messages on the queue (whichever comes first). The default value of ListenerTimeLimit is TimeSpan.Zero, meaning that only one attempt to receive a message from the queue will be performed by each listener thread. The current implementation uses the standard .NET thread pool. Future implementations will use a custom (and pluggable) thread pool.

33.3.4.1. NonTransactionalMessageListenerContainer

This container performs a Receive operation on the MessageQueue without any transactional settings. As such messages will not be redelivered if an exception is thrown during message processing. Exceptions during message processing can be handled via an implementation of the interface IExceptionHandler. This can be set via the property ExceptionHandler on the listener. The IExceptionHandler interface is shown below

    public interface IExceptionHandler
    {
        void OnException(Exception exception, Message message);
    }

An example of configuring a NonTransactionalMessageListenerContainer with an IExceptionHandler is shown below

  <!-- Queue to receive from -->
  <object id='msmqTestQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'>
    <property name='Path' value='.\Private$\testqueue'/>
    <property name='MessageReadPropertyFilterSetAll' value='true'/>
    <property name='ProductTemplate'>
      <object>
        <property name='Label' value='MyTestQueueLabel'/>
      </object>
    </property>
  </object>

  <!-- Queue to respond to -->
  <object id='msmqTestResponseQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'>
    <property name='Path' value='.\Private$\testresponsequeue'/>
    <property name='MessageReadPropertyFilterSetAll' value='true'/>
    <property name='ProductTemplate'>
      <object>
        <property name='Label' value='MyTestResponseQueueLabel'/>
      </object>
    </property>
  </object>

  <!-- Listener container -->
  <object id="nonTransactionalMessageListenerContainer" type="Spring.Messaging.Listener.NonTransactionalMessageListenerContainer, Spring.Messaging">
    <property name="MessageQueueObjectName" value="msmqTestQueue"/>
    <property name="MaxConcurrentListeners" value="2"/>
    <property name="ListenerTimeLimit" value="20s"/>  <!-- 20 seconds -->
    <property name="MessageListener" ref="messageListenerAdapter"/>
    <property name="ExceptionHandler" ref="exceptionHandler"/>
  </object>

  <!-- Delegate to plain CLR object for message handling -->
  <object id="messageListenerAdapter" type="Spring.Messaging.Listener.MessageListenerAdapter, Spring.Messaging">
    <property name="DefaultResponseQueueName" value="msmqTestResponseQueue"/>
    <property name="HandlerObject" ref="simpleHandler"/>
  </object>

  <!-- Classes you need to write -->
  <object id="simpleHandler" type="MyNamespace.SimpleHandler, MyAssembly"/>

  <object id="exceptionHandler" type="MyNamespace.SimpleExceptionHandler, MyAssembly"/>

The SimpleHandler class would look something like this

public class SimpleHandler : ISimpleHandler
{
  public void HandleObject(string txt)
  {
     // perform message processing...
     Console.WriteLine("Received text: " + txt);
  }
}

33.3.4.2. TransactionalMessageListenerContainer

This message listener container performs receive operations within the context of local transaction. This class requires an instance of Spring's IPlatformTransactionManager, either AdoPlatformTransactionManager, HibernateTransactionManager, or MessageQueueTransactionManager.

If you specify a MessageQueueTransactionManager then a MessageQueueTransaction will be started before receiving the message and used as part of the container's receive operation. As with other IPlatformTransactionManager implementation's, the transactional resources (in this case an instance of the MessageQueueTransaction class) is bound to thread local storage. MessageQueueTemplate will look in thread-local storage and use this 'ambient' transaction if found for its send and receive operations. The message listener is invoked and if no exception occurs, then the MessageQueueTransactionManager will commit the MessageQueueTransaction.

The message listener implementation can call into service layer classes that are made transactional using standard Spring declarative transactional techniques. In case of exceptions in the service layer, the database operation will be rolled back (nothing new here), and the TransactionalMessageListenerContainer will call it's IMessageTransactionExceptionHandler implementation to determine if the MessageQueueTransaction should commit (removing the message from the queue) or rollback (leaving the message on the queue for redelivery).

[Note]Note

The use of a transactional service layer in combination with a MessageQueueTransactionManager is a powerful combination that can be used to achieve "exactly one" transaction message processing with database operations. This requires a little extra programming effort and is a more efficient alternative than using distributed transactions which are commonly associated with this functionality since both the database and the message transaction commit or rollback together.

The additional programming logic needed to achieve this is to keep track of the Message.Id that has been processed successfully within the transactional service layer. This is needed as there may be a system failure (e.g. power goes off) between the 'inner' database commit and the 'outer' messaging commit, resulting in message redelivery. The transactional service layer needs logic to detect if incoming message was processed successfully. It can do this by checking the database for an indication of successful processing, perhaps by recording the Message.Id itself in a status table. If the transactional service layer determines that the message has already been processed, it can throw a specific exception for this case. The container's exception handler will recognize this exception type and vote to commit (remove from the queue) the 'outer' messaging transaction. Spring provides an exception handler with this functionality, see SendToQueueExceptionHandler described below.

An example of configuring the TransactionalMessageListenerContainer using a MessageQueueTransactionManager is shown below

  <!-- Queue to receive from -->
  <object id='msmqTestQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'>
    <property name='Path' value='.\Private$\testqueue'/>
    <property name='MessageReadPropertyFilterSetAll' value='true'/>
    <property name='ProductTemplate'>
      <object>
        <property name='Label' value='MyTestQueueLabel'/>
      </object>
    </property>
  </object>

  <!-- Queue to respond to -->
  <object id='msmqTestResponseQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'>
    <property name='Path' value='.\Private$\testresponsequeue'/>
    <property name='MessageReadPropertyFilterSetAll' value='true'/>
    <property name='ProductTemplate'>
      <object>
        <property name='Label' value='MyTestResponseQueueLabel'/>
      </object>
    </property>
  </object>

  <!-- Transaction Manager for MSMQ Messaging -->
  <object id="messageQueueTransactionManager" type="Spring.Messaging.Core.MessageQueueTransactionManager, Spring.Messaging"/>

  <!-- The transaction message listener container -->
  <object id="transactionalMessageListenerContainer" type="Spring.Messaging.Listener.TransactionalMessageListenerContainer, Spring.Messaging">
    <property name="MessageQueueObjectName" value="msmqTestQueue"/>
    <property name="PlatformTransactionManager" ref="messageQueueTransactionManager"/>
    <property name="MaxConcurrentListeners" value="5"/>
    <property name="ListenerTimeLimit" value="20s"/>
    <property name="MessageListener" ref="messageListenerAdapter"/>
    <property name="MessageTransactionExceptionHandler" ref="messageTransactionExceptionHandler"/>
  </object>

  <!-- Delegate to plain CLR object for message handling -->
  <object id="messageListenerAdapter" type="Spring.Messaging.Listener.MessageListenerAdapter, Spring.Messaging">
    <property name="DefaultResponseQueueName" value="msmqTestResponseQueue"/>
    <property name="HandlerObject" ref="simpleHandler"/>
  </object>

  <!-- Poison message handling -->
  <object id="messageTransactionExceptionHandler" type="Spring.Messaging.Listener.SendToQueueExceptionHandler, Spring.Messaging">
    <property name="MaxRetry" value="5"/>
    <property name="MessageQueueObjectName" value="testTxErrorQueue"/>
  </object>

  <!-- Classes you need to write -->
  <object id="simpleHandler" type="MyNamespace.SimpleHandler, MyAssembly"/>


If you specify either AdoPlatformTransactionManager or HibernateTransactionManager then a local database transaction will be started before the receiving the message. By default, the container will also start a local MessageQueueTransaction after the local database transaction has started, but before the receiving the message. This MessageQueueTransaction will be used to receive the message. By default the MessageQueueTransaction will be bound to thread local storage so that any MessageQueueTemplate send or receive operations will participate transparently in the same MessageQueueTransaction. If you do not want this behavior set the property ExposeContainerManagedMessageQueueTransaction to false.

In case of exceptions during IMessageListener processing when using either either AdoPlatformTransactionManager or HibernateTransactionManager the container's IMessageTransactionExceptionHandler will determine if the MessageQueueTransaction should commit (removing it from the queue) or rollback (placing it back on the queue for redelivery). The listener exception will always trigger a rollback in the 'outer' database transaction.

Poison message handing, that is, the endless redelivery of a message due to exceptions during processing, can be detected using implementations of the IMessageTransactionExceptionHandler. This interface is shown below

public interface IMessageTransactionExceptionHandler
{
    TransactionAction OnException(Exception exception, Message message,  MessageQueueTransaction messageQueueTransaction);
}

The return value is an enumeration with the values Commit and Rollback. A specific implementation is provided that will move the poison message to another queue after a maximum number of redelivery attempts. See SendToQueueExceptionHandler described below. You can set a specific implementation to by setting TransactionalMessageListenerContainer's property MessageTransactionExceptionHandler

The IMessageTransactionExceptionHandler implementation SendToQueueExceptionHandler keeps track of the Message's Id property in memory with a count of how many times an exception has occurred. If that count is greater than the handler's MaxRetry count it will be sent to another queue using the provided MessageQueueTransaction. The queue to send the message to is specified via the property MessageQueueObjectName.

33.3.4.3. DistributedTxMessageListenerContainer

This message listener container performs receive operations within the context of distributed transaction. A distributed transaction is started before a message is received. The receive operation participates in this transaction using by specifying MessageQueueTransactionType = Automatic. The transaction that is started is automatically promoted to two-phase-commit to avoid the default behavior of transaction promotion since the only reason to use this container is to use two different resource managers (messaging and database typically).

The commit and rollback semantics are simple, if the message listener does not throw an exception the transaction is committed, otherwise it is rolled back.

Exceptions in message listener processing are handled by implementations of the IDistributedTransactionExceptionHandler interface. This interface is shown below

    public interface IDistributedTransactionExceptionHandler
    {
        bool IsPoisonMessage(Message message);

        void HandlePoisonMessage(Message poisonMessage);        

        void OnException(Exception exception, Message message);
    }

the IsPoisonMessage method determines whether the incoming message is a poison message. This method is called before the IMessageListener is invoked. The container will call HandlePoisonMessage is IsPoisonMessage returns true and will then commit the distributed transaction (removing the message from the queue. Typical implementations of HandlePoisonMessage will move the poison message to another queue (under the same distributed transaction used to receive the message). The class SendToQueueDistributedTransactionExceptionHandler detects poison messages by tracking the Message Id property in memory with a count of how many times an exception has occurred. If that count is greater than the handler's MaxRetry count it will be sent to another queue. The queue to send the message to is specified via the property MessageQueueObjectName.

33.4. MessageConverters

33.4.1. Using MessageConverters

In order to facilitate the sending of business model objects, the MessageQueueTemplate has various send methods that take a .NET object as an argument for a message's data content. The overloaded methods ConvertAndSend and ReceiveAndConvert in MessageQueue delegate the conversion process to an instance of the IMessageConverter interface. This interface defines a simple contract to convert between .NET objects and JMS messages. The interface is shown below

    public interface IMessageConverter : ICloneable
    {
        Message ToMessage(object obj);

        object FromMessage(Message message);
    }

There are a standard implementations provided the simply wrap existing IMessageFormatter implementations.

  • XmlMessageConverter - uses a XmlMessageFormatter.

  • BinaryMessageConverter - uses a BinaryMessageFormatter

  • ActiveXMessageConverter - uses a ActiveXMessageFormatter

The default implementation used in MessageQueueTemplate and the message listener containers is an instance of XmlMessageConverter configured with a TargetType to be System.String. You specify the types that the XmlMessageConverter can convert though either the array property TargetTypes or TargetTypeNames. Here is an example taken from the QuickStart application

  <object id="xmlMessageConverter" singleton="false" type="Spring.Messaging.Support.Converters.XmlMessageConverter, Spring.Messaging">
    <property name="TargetTypes">
      <list>
        <value>Spring.MsmqQuickStart.Common.Data.TradeRequest, Spring.MsmqQuickStart.Common</value>
        <value>Spring.MsmqQuickStart.Common.Data.TradeResponse, Spring.MsmqQuickStart.Common</value>
        <value>System.String, mscorlib</value>        
      </list>
    </property>
  </object>

You can specify other IMessageConverter implementations using the MessageConverterObjectName property on the MessageQueueTemplate and MessageListenerAdapter.

[Note]Note

The scope of the object definition is set to singleton="false", meaning that a new instance of the MessageConverter will be created each time you ask the container for an object of the name 'xmlMessageConverter'. This is important to ensure that a new instance will be used for each thread. If you forget, a warning will be logged and IMessageConverter's Clone() method will be called to create an indepentend instance.

Other implementations provided are

  • XmlDocumentConverter - loads and saves an XmlDocument to the message BodyStream. This lets you manipulate directly the XML data independent of type serialization issues. This is quite useful if you use XPath expressions to pick out the relevant information to construct your business objects.

Other potential implementations:

  • RawBytesMessageConverter - directly write raw bytes to the message stream, compress

  • CompressedMessageConverter - compresses the message payload

  • EncryptedMessageConverter - encrypt the message (standard MSMQ encryptiong has several limitations)

  • SoapMessageConverter - use soap formatting.

33.5. Interface based message processing

33.5.1.1. MessageListenerAdapater

The MessageListenerAdapter allows methods of a class that does not implement the IMessageListener interface to be invoked upon message delivery. Lets call this class the 'message handler' class. To achieve this goal the MessageListenerAdapter implements the standard IMessageListener interface to receive a message and then delegates the processing to the message handler class. Since the message handler class does not contain methods that refer to MSMQ artifacts such as Message, the MessageListenerAdapter uses a IMessageConverter to bridge the MSMQ and 'plain object' worlds. As a reminder, the default XmlMessageConverter used in MessageQueueTemplate and the message listener containers converts from Message to string. Once the incoming message is converted to an object (string for example) a method with the name 'HandleMessage' is invoked via reflection passing in the string as an argument.

Using the default configuration of XmlMessageConverter in the message listeners, a simple string based message handler would look like this.

public class MyHandler
{

    public void HandleMessage(string text)
    {
      ...
    }
     
}

The next example has a similar method signature but the name of the handler method name has been changed to "DoWork", by setting the adapter's property DefaultHandlerMethod.

public interface IMyHandler
{
    void DoWork(string text);
}

If your IMessageConverter implementation will return multiple object types, overloading the handler method is perfectly acceptable, the most specific matching method will be used. A method with an object signature would be consider a 'catch-all' method of last resort.

public interface IMyHandler
{
   void DoWork(string text);
   void DoWork(OrderRequest orderRequest);
   void DoWork(InvoiceRequest invoiceRequest);
   void DoWork(object obj);
}

Another of the capabilities of the MessageListenerAdapter class is the ability to automatically send back a response Message if a handler method returns a non-void value. Any non-null value that is returned from the execution of the handler method will (in the default configuration) be converted to a string. The resulting string will then be sent to the ResponseQueue defined in the Message's ResponseQueue property of the original Message, or the DefaultResponseQueueName on the MessageListenerAdapter (if one has been configured) will be used. If not ResponseQueue is found then an Spring MessagingException will be thrown. Please note that this exception will not be swallowed and will propagate up the call stack.

Here is an example of Handler signatures that have various return types.

public interface IMyHandler
{
   string DoWork(string text);
   OrderResponse DoWork(OrderRequest orderRequest);
   InvoiceResponse DoWork(InvoiceRequest invoiceRequest);
   void DoWork(object obj);
}

The following configuration shows how to hook up the adapter to process incoming MSMQ messages using the default message converter.

  <!-- Delegate to plain CLR object for message handling -->
  <object id="messageListenerAdapter" type="Spring.Messaging.Listener.MessageListenerAdapter, Spring.Messaging">
    <property name="DefaultResponseQueueName" value="msmqTestResponseQueue"/>
    <property name="HandlerObject" ref="myHandler"/>
  </object>

33.6. Comparison with using WCF

The goals of Spring's MSMQ messaging support are quite similar to those of WCF with its MSMQ related bindings, in as much as a WCF service contract is a POCO (minus the attributes if you really picky about what you call a POCO). Spring's messaging support can give you the programming convenience of dealing with POCO contracts for message receiving but does not (at the moment) provide a similar POCO contract for sending, instead relying on explicit use of the MessageQueueTemplate class. This feature exists - some question whether it should for messaging - in the Java version of the Spring framework, see JmsInvokerServiceExporter and JmsInvokerProxyFactoryBean.

The good news is that if and when it comes time to move from a Spring MSMQ solution to WCF, you will be in a great position as the POCO interface used for business processing when receiving in a Spring based MSMQ application can easily be adapted to a WCF environment. There may also be some features unique to MSMQ and/or Spring's MSMQ support that you may find appealing over WCF. Many messaging applications still need to be 'closer to the metal' and this is not possible using the WCF bindings, for example Peeking and Label, AppSpecific properties, multicast.. An interesting recent quote by Yoel Arnon (MSMQ guru) "With all the respect to WCF, System.Messaging is still the major programming model for MSMQ programmers, and is probably going to remain significant for the foreseeable future. The message-oriented programming model is different from the service-oriented model of WCF, and many real-world solutions would always prefer it."