Spring Integration 5 – Message Channels, Introduction and Common Properties

By | December 31, 2017

This article is an article in a series of articles introducing Spring Integration 5. I imagine most of the examples can be applied to earlier versions of Spring Integration as well, it just happens that I started out with Spring Integration 5 and I have little reason to investigate compatibility with previous versions.

In this article I will be introducing Spring Integration message channels and examine the common properties found in all message channels.

Examples

All the examples in this series of articles are taken from my Spring Integration Scrutinized repository that is available on GitHub.

Types of Message Channels

Message channels in Spring Integration can be divided into two main categories:

  • Subscribable message channels
  • Pollable message channels

Subscribable message channels are message channels to which one or more consumers register themselves, subscribe, to receive the messages posted onto the message channel. A message posted to the message channel is only delivered to one of the clients that have subscribed to the message channel prior to the message being posted to the message channel.

Subscribable message channel with three consumers.
Consumer 3 consumes the message posted to the message channel by the producer.

Pollable message channels store the messages posted to the message channel until a consumer comes and requests a message from the message channel. Consumers does not subscribe to a pollable message channel.

Pollable message channel to which a producer posts a message. A consumer polls the message channel at some later point in time and consumes the message posted to the message channel earlier.

In each of these two message channel categories there are several different types of message channels, as we will see later. First we’ll have a look at the similarities.

Common Properties of Message Channels

All Spring Integration message channels implement the MessageChannel interface found in the package org.springframework.messaging. The inheritance hierarchy of MessageChannel can be seen in the figure below.

Spring Integration 5 message channels class hierarchy.

Note the PollableChannel and SubscribableChannel interfaces being the roots of the two main message channel categories. In addition, also note the FluxMessageChannel class, which is a concrete message channel implementation which does not implement PollableChannel nor a SubscribableChannel.

The MessageChannel interface only contains two send methods with the following signatures:

  • boolean send(Message<?> message)
    Sends a message to the message channel with an indefinite timeout and return true if sending the message was successful.
  • boolean send(Message<?> message, long timeout)
    Sends a message to the message channel with a specified timeout.

Sending one or more messages is done in almost all the examples related to message channels and no special examples will be given.

The AbstractMessageChannel class is the class from which all the different types of message channels inherit. It implements support for the following features:

  • Retaining a history of messages sent through the message channel (tracking).
  • Message channel statistics.
  • Logging.
    Outputs log messages during the different stages of sending a message to the message channel if logging is enabled and the log level is set to debug or lower.
  • Configuring one or more message datatypes accepted by the message channel.
    In addition, a message converter can be configured on message channels which will attempt to convert any message of other datatype sent to the message channel to a datatype accepted by the message channel.
  • Interceptors.

Message History

This example from the MessageChannelsCommonTests class shows how to enable and retrieve message history for a message channel.

Note that:

  • The DirectChannel message channel used in this example is named using the setComponentName method.
    This is the name that will appear in the message history.
  • Message history is enabled for the individual message channel by calling the setShouldTrack method with the parameter true.
  • One single message is sent to the message channel.
    Thus we can expect one single message history entry to be generated as a result.
  • The message history object and a message history entry are logged.
    This will produce console log that will be examined below.
  • The name property of the message history entry is asserted to be equal to the message channel name.

If the test is run, the two following lines can be seen in the console log:

From this one can conclude that the message history object contains the message history for an object named MyDirectChannel, which is the name of the direct message channel created in the test. If there are multiple message history entries in the message history object, the string representation will be a comma-separated list of the name properties from the entries.
The subsequent log-line shows that one message history entry contains three properties:

  • Name
    The name is the name of the component that logged the message history entry.
  • Type
    The type is the type of the component that logged the message history entry.
  • Timestamp
    The timestamp is the time at which the message history entry was logged.

Message Channel Statistics

There are two alternatives as far as message channel statistics are concerned; full statistics and counts only. The former includes not only gathering of counts and timing information but will also calculate mean values. The counts-only option will, as the name implies, only maintain counts of, for instance, how many messages that have passed through the message channel.

Full Message Channel Statistics

From the MessageChannelsCommonTests class comes this example showing how to enable and retrieve full statistics for a message channel:

Note that:

  • After the message channel has been created, a DefaultMessageChannelMetrics object is created and set on the message channel using the configureMetrics method.
  • Having set the channel metrics object on the message channel, statistics is enabled on the message channel using the method setStatsEnabled.
    It is necessary to set the channel metrics object before enabling statistics, otherwise the channel metrics object will not be configured properly.
  • A number of messages are sent to the message channel.
    There is a random delay between each message to make the statistics more realistic.
  • Some simple assertions are made on the message channel metrics object.
  • Metrics are retrieved from the message channel metrics object and logged to the console.
    This is one, probably the less common, way metrics for a message channel can be retrieved.
  • Metrics are retrieved from the message channel itself and logged to the console.
    Message channels have a number of delegate methods which will retrieve metrics from the message channel metrics object, so it is not necessary to keep a reference to this object.

When the test is run, the following lines may be observed in the console:

We can see that statistics is indeed gathered and, as far as the mean values, calculated. It will come as no surprise to see that the same information is retrieved from both the message channel metrics object and the message channel itself.

Simple Message Channel Statistics

From the MessageChannelsCommonTests class comes this example showing how to enable and retrieve simple statistics for a message channel:

Note that:

  • No message channel metrics object is neither created nor configured on the message channel.
    Instead we will rely on the default message channel metrics object that is created as part of creating the message channel.
  • Simple statistics are enable on the message channel object using the setCountsEnabled method.
  • The log statements attempting to retrieve metrics directory from the message channel are identical with the ones in the full message channel statistics example above.
  • There is a bug, INT-4373, in Spring Integration 5.0.0 and earlier that will cause the error count to be incremented when simple statistics is enabled despite no errors having occurred.

Running the test, the following lines may be observed in the console:

It can be seen that all the rate and duration numbers are now zero. The only number that is not zero is the send count. The error count is also zero, but would have been non-zero if an error would have occurred sending a message to the message channel.

Message Channel Logging

Spring Integration allows for turning on logging from message channels that will log each message being sent to the message channel before and after the send operation. Turning on this type of logging requires two steps; set the log-level for either a specific type of message channel or for all message channels to debug level and enable logging for the particular message channel that is to log when it is created.

In the example project, I have used Logback for logging and the logback-test.xml file is the appropriate place to set the log-level to debug. The following line is to be added inside the <configuration> element:

The example test, again in from the MessageChannelsCommonTests class, looks like this:

Note that:

  • After the message channel has been created and its name set, logging is enabled using the setLoggingEnabled method.
  • There are no assertions verifying the outcome.
    The result is log output, in this case to the console, as we are about to see.

If the test is run, the following log output can be observed in the console:

Some output has been removed to reduce the amount of text.
We can see that for each message sent to the message channel, two lines of log are produced
The first one is written before the message is sent:

The second line of log is produced after the message has been sent and no exceptions occurred in connection to sending the message:

The following information is logged:

  • Whether the logging is before(preSend) or after(postSend) the sending of the message.
  • The channel to which the message is sent.
    In the output above it is org.springframework.integration.channel.DirectChannel@61019f59.
  • In the case of postSend, the boolean result of the sending is logged.
    This boolean indicates whether the sending was successful.
  • The type of the message sent to the message channel.
    In the log above, the type of message is GenericMessage.
  • The payload of the message.
    The headers of the message.

Accepted Datatypes in Message Channels

In the following examples the ability to restrict the message payload datatypes accepted by a message channel to one or more types (classes).
In addition an example will show how a message converter may be registered with a message channel. Such a message converter will attempt to convert any type not readily accepted by a message channel into a type that is accepted.

Datatype Match

The first example shows a success scenario; a message channel is configured to accept only message payloads of the type Long and a message containing a long integer is sent to the message channel.

Note that:

  • The type of the variable theInputMessage is Message<?>.
    This type was chosen in order to allow for any type of payload in the input message.
  • Having created the message channel, it is configured to accept only message payloads of the type Long using the setDatatypes method.
    This method takes a variable number of arguments, so in order to configure a message channel to accept more than one payload type just add further type parameters.

If we run the test, it should pass and the message, with the long integer payload, should be logged to the console:

Datatype Mismatch

The second example shows how to configure a message channel to only accept message payloads of the type Long and what happens when an attempt to send a message containing a string payload is made.

Note that:

  • The type of the variable theInputMessage is Message<?>.
    This type was chosen in order to allow for any type of payload in the input message.
  • Having created the message channel, it is configured to accept only message payloads of the type Long using the setDatatypes method.
  • The statement sending the message to the message channel is surrounded by a try-catch block.
    This was done in order to be able to log the exception to the console.

If the test is run it should pass and output similar to the following should be logged to the console:

There is an excellent error message telling us that the message channel MyDirectChannel expected only Long datatypes but received a String.

Datatype Conversion

The last example related to message channels and accepted datatypes will show how to configure a message converter on a message channel, in order to handle data-types not readily accepted by the message channel but which can be converted into a datatype that is accepted.

Note that:

  • The type of the variables theInputMessage and theOutputMessage are Message<?>.
    This type was chosen in order to allow for any type of payload in the messages.
  • An input message is created with the payload being a string containing the number “1234567”.
  • Having created the message channel, it is configured to accept only message payloads of the type Long using the setDatatypes method.
  • A message converter is created and the message channel is configured to use this message converter using the setMessageConverter method.
    In the example the GenericMessageConverter available in Spring Integration is used, which will delegate any type conversion to the default conversion service.

If the test is run, it should pass and the following should be output to the console:

Message Channel Interceptors

Message channel interceptors are similar to around advice in aspect-oriented programming and the intercepting filter JavaEE pattern. For those familiar with Java servlets, an interceptor is very similar to a servlet filter.
In other words, a message channel interceptor intercepts sending and/or receiving of messages from a message channel at certain locations allowing for modification of messages. An interceptor can even stop the sending of a message to or receiving a message from a message channel from happening.

All channel interceptors in Spring Integration implement the ChannelInterceptor interface that contain the following methods:

  • Message<?> preSend(Message<?> message, MessageChannel channel)
    Invoked before a message is being sent to a message channel.
    Allows for returning a modified message, or null to stop sending message to channel.
  • void postSend(Message<?> message, MessageChannel channel, boolean sent)
    Invoked after a message has been sent to a message channel. The boolean indicates whether message was successfully sent.
  • void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex)
    Invoked after the sending of a message to a message channel has completed, regardless of any exception thrown during the send operation.
  • boolean preReceive(MessageChannel channel)
    Invoked before trying to receive a message from a pollable message channel. No message will be retrieved if this method returns false.
  • Message<?> postReceive(Message<?> message, MessageChannel channel)
    Invoked after a message has been received from a pollable message channel. Allows for returning a modified message, or null to stop any further interceptors from being called.
  • void afterReceiveCompletion(@Nullable Message<?> message, MessageChannel channel, Exception ex)
    Invoked after the receiving of a message from a pollable message channel has completed, regardless of any exceptions thrown during the receive operation.

The following figure illustrates the order in which the different methods in a ChannelInterceptor are invoked when sending to and receiving from a pollable message channel. A pollable message channel has been chosen in order for all the methods of the interceptor to be invoked.

Sending and receiving from a pollable message channel with a channel interceptor.

Logging and Counting Channel Interceptor

In order to show a code example with a channel interceptor, let’s implement an interceptor first. This channel interceptor will keep a count for each of the different methods in the ChannelInterceptor interface and also log the calls to the methods.

Nothing surprising about the above class; it implements the ChannelInterceptor interface and a method that logs a message. To keep track of the different counts, instances of AtomicInteger have been used in order to make the counting thread-safe.

Message Channel Interceptor Example

Using the logging and counting channel interceptor above, an example showing how to use message channel interceptors in Spring Integration can now be developed. The example can be found in the MessageChannelCommonTests class:

Note that:

  • Having created the message channel, an instance of the LoggingAndCountingChannelInterceptor is created.
  • The message channel interceptor is added to the interceptors of the message channel using the addInterceptor method.
    It is possible to add multiple interceptors to a message channel and, if using the addInterceptor method that takes a single parameter, the last added interceptor will also be the interceptor that is invoked last in the chain of interceptors.
    There is also an addInterceptor method that takes two parameters; one index and a channel interceptor instance. This method will add the interceptor at the supplied index in the interceptor list.
  • Finally there are assertions verifying that each of the preSend, postSend and afterSendCompletion counters in the interceptor does match the number of messages sent to the message channel.
    As in the assertion message strings, each of these methods are expected to be invoked once for each message sent to the message channel.

If the example is run, the test should pass and for each message there should be three lines of log output written to the console:

We can see that the three methods preSend, postSend and afterSendCompletion in the message channel interceptor are indeed invoked once for each message.

This concludes this first article on Spring Integration message channels!

Happy coding!

Leave a Reply

Your email address will not be published. Required fields are marked *