It’s been a long time since I started looking at Spring Integration 5 and after miscellaneous interventions, such as a book on the Spring Core certification, having moved and a job that provides a never-ending stream of surprises, I am finally back!
This article is an article in a series of articles introducing Spring Integration 5. I imagine most of the examples can be applied to earlier versions of Spring Integration as well, it just happens that I started out with Spring Integration 5 and I have little reason to investigate compatibility with previous versions.
If you are not familiar with Spring Integration 5 message channels, I recommend reading my previous article about general properties of message channels.
Examples Repository
All the examples in this series of articles are taken from my Spring Integration Scrutinized repository that is available on GitHub.
Subscribable Message Channels
A subscribable message channel is a message channel to which a number of subscribers, message consumers, can subscribe. The common behavior for a subscribable message channel when a message is sent to the message channel is to deliver the message to one single subscriber. There is an exception to this behavior in the PublishSubscribeChannel with which all subscribers to the message channel get a copy of each message sent to the PublishSubscribeChannel message channel.
Recall the following figure from the earlier introduction on message channels:

In order for a subscriber to have a chance to receive a message posted to a subscribable message channel, the subscriber must have subscribed to the message channel prior to the message being sent to the message channel. Subscribable message channels do not buffer messages.
In Spring Integration 5 the following message channel are subscribable message channels:
- DirectChannel
- ExecutorChannel
- FluxMessageChannel
- PublishSubscribeChannel
- FixedSubscriberChannel

Some words about the relationships of subscribable message channels:
- All subscribable message channels implement the MessageChannel interface, directly or indirectly.
- All subscribable message channels, except for FluxMessageChannel, implement the SubscribableChannel interface.
FluxMessageChannel implements the Publisher interface which do contain a subscribe method. The subscribe method in the Publisher interface has a different parameter type compared to the subscribe method in the SubscribableChannel interface. - All subscribable message channels, except for FluxMessageChannel, inherit from the class AbstractSubscribableChannel.
- All subscribable message channels, including FluxMessageChannel, inherit from AbstractMessageChannel, directly or indirectly.
Note that AbstractMessageChannel does not define any properties or behavior specific to subscribable message channels
Common Properties of Subscribable Message Channels
The main focus of this post is common properties of subscribable message channels which are defined in the SubscribableChannel interface and implemented in AbstractSubscribableChannel. The interface contains only two methods for managing subscribers of a subscribable message channel:
boolean subscribe(MessageHandler handler)
Adds a subscriber to the subscribers of the message channel.boolean unsubscribe(MessageHandler handler)
Removes a subscriber from the subscribers of the message channel.
The AbstractSubscribableChannel class, which is the parent of DirectChannel, PublishSubscribeChannel and ExecutorChannel, implements subscriber management and message sending for subscribable channels.
The following examples demonstrating common properties of subscribable message channels are taken from the class SubscribableChannelsTests.
No Subscribers
The first example shows how a subscribable message channel behaves when a message is sent to the message channel and there are no subscribers subscribed to the message channel.
/** * Tests creating a subscribable message channel and sending a message * to it without any subscribers being subscribed. * Expected result: * An exception should be thrown indicating that the message could * not be delivered. */ @Test public void noSubscribersTest() { final SubscribableChannel theSubscribableChannel; final Message<String> theInputMessage; theInputMessage = MessageBuilder .withPayload(GREETING_STRING) .build(); theSubscribableChannel = new DirectChannel(); /* * Give the message channel a name so that it will * appear in any related log messages. */ ((AbstractSubscribableChannel) theSubscribableChannel) .setBeanName("MessageChannelWithNoSubscribers"); Assertions.assertThrows(MessageDeliveryException.class, () -> theSubscribableChannel.send(theInputMessage)); }
Note that:
- A message is created.
- A subscribable message channel is created.
The concrete message channel type used is DirectChannel. - The component name is set on the message channel.
This will help us identify the message channel in log messages. The component name, if set, will be used to identify the message channel in log output. If the component name is not set, the bean name will be used. - The message is sent to the message channel.
The send-operation is expected to throw a MessageDeliveryException.
When run, the test passes without any output. To examine the exception thrown, the assertion assertThrows can be removed, in which case the test will fail with the following logged to the console (this is not the complete output):
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'MessageChannelWithNoSubscribers'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=Hello Integrated World!, headers={id=d6ef7494-9602-d880-8252-d5e64c9a2100, timestamp=1560162496453}], failedMessage=GenericMessage [payload=Hello Integrated World!, headers={id=d6ef7494-9602-d880-8252-d5e64c9a2100, timestamp=1560162496453}] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
We see that there is indeed a MessageDeliveryException thrown and that it is caused by the (subscribable) message channel ‘MessageChannelWithNoSubscribers’ not having any subscribers.
Single Subscriber
In the second example we will look at how to subscribe a single subscriber to a subscribable message channel and what happens when a message is sent to the message channel.
/** * Tests creating a subscribable message channel and subscribing one * subscriber to the channel. A message is then sent to the channel. * Expected result: * The single subscriber should receive the message sent to the * subscribable message channel. */ @Test public void singleSubscriberTest() { final SubscribableChannel theSubscribableChannel; final Message<String> theInputMessage; final List<Message> theSubscriberReceivedMessages = new CopyOnWriteArrayList<>(); theInputMessage = MessageBuilder .withPayload(GREETING_STRING) .build(); theSubscribableChannel = new DirectChannel(); /* * Give the message channel a name so that it will * appear in any related log messages. */ ((AbstractSubscribableChannel) theSubscribableChannel) .setBeanName("MessageChannelWithSingleSubscriber"); /* * Create a subscriber (message handler) that adds each received * message to a list. */ final MessageHandler theSubscriber = theSubscriberReceivedMessages::add; /* Register the subscriber with the subscribable message channel. */ final boolean theSubscribedFlag = theSubscribableChannel.subscribe(theSubscriber); Assertions.assertTrue(theSubscribedFlag); theSubscribableChannel.send(theInputMessage); /* Wait until the subscriber has received the message. */ await() .atMost(2, TimeUnit.SECONDS) .until(() -> !theSubscriberReceivedMessages.isEmpty()); /* * The subscriber that subscribed to the subscribable message * channel prior to the message was sent to the message channel * should receive the message. */ LOGGER.info("Subscriber received message: " + theSubscriberReceivedMessages.get(0)); Assertions.assertEquals( 1, theSubscriberReceivedMessages.size(), "A single message should have been received by the subscriber"); }
Note that:
- A subscribable message channel is created and named.
This is accomplished in exactly the same way as in the previous example. - A subscriber is created.
The subscriber is a MessageHandler. In this example, the message handler simply calls the add method on the theSubscriberReceivedMessages list. - The subscriber is subscribed to the subscribable message channel.
The subscribe method returns true if the subscriber was successfully subscribed to the message channel and false if the subscriber was already subscribed to the message channel. - A message is sent to the subscribable message channel.
- The test waits at most two seconds for the list of messages received by the subscriber to become non-empty.
- Verify that the subscriber has received exactly one message.
Running this example, we can see the following output in the console:
2019-06-11 19:10:12.368 INFO 7544 --- [ main] o.s.integration.channel.DirectChannel : Channel 'MessageChannelWithSingleSubscriber' has 1 subscriber(s). 2019-06-11 19:10:12.499 INFO 7544 --- [ main] s.i.s.m.s.SubscribableChannelsTests : Subscriber received message: GenericMessage [payload=Hello Integrated World!, headers={id=4e6e74ea-48f9-3b76-c200-c67e39177023, timestamp=1560251412365}]
We see that one subscriber was subscribed to the message channel and that the subscriber received a message with the string payload “Hello Integrated World!”, which is the payload of the message sent to the message channel.
Multiple Subscribers
The next example examines how subscribable message channels behaves when there are more than one subscriber subscribed to one and the same message channel.
/** * Tests creating a subscribable message channel and subscribing two * subscribers to the channel. A message is then sent to the channel. * Expected result: * One single message should be received by one of the subscribers * subscribed to the message channel. No message should be received by * the other subscriber. * Note! * This behaviour is not common for all subscribable message channels! * The {@code PublishSubscribeChannel} will publish a message sent * to the message channel to all of its subscribers. */ @Test public void multipleSubscribersTest() { final SubscribableChannel theSubscribableChannel; final Message<String> theInputMessage; final List<Message> theFirstSubscriberReceivedMessages = new CopyOnWriteArrayList<>(); final List<Message> theSecondSubscriberReceivedMessages = new CopyOnWriteArrayList<>(); theInputMessage = MessageBuilder .withPayload(GREETING_STRING) .build(); theSubscribableChannel = new DirectChannel(); /* * Give the message channel a name so that it will * appear in any related log messages. */ ((AbstractSubscribableChannel) theSubscribableChannel) .setBeanName("MessageChannelWithMultipleSubscribers"); /* * Create two subscribers (message handler) that adds each received * message to a list. */ final MessageHandler theFirstSubscriber = theFirstSubscriberReceivedMessages::add; final MessageHandler theSecondSubscriber = theSecondSubscriberReceivedMessages::add; /* Register the subscribers with the subscribable message channel. */ final boolean theFirstSubscribedFlag = theSubscribableChannel.subscribe(theFirstSubscriber); final boolean theSecondSubscribedFlag = theSubscribableChannel.subscribe(theSecondSubscriber); /* Verify that both subscribers have been successfully subscribed to the message channel. */ Assertions.assertTrue(theFirstSubscribedFlag); Assertions.assertTrue(theSecondSubscribedFlag); theSubscribableChannel.send(theInputMessage); await() .atMost(2, TimeUnit.SECONDS) .until(() -> (theFirstSubscriberReceivedMessages.size() > 0) || (theSecondSubscriberReceivedMessages.size() > 0)); /* * Only one subscriber of the direct message channel is expected * to receive the message sent. */ Assertions.assertEquals( 1, theFirstSubscriberReceivedMessages.size() + theSecondSubscriberReceivedMessages.size(), "Only one of the subscribers should have received the message sent"); }
Note that:
-
A
subscribable channel of the type DirectChannel is created and
named.
The type of the subscribable message channel is of importance, as there is one type of subscribable message channel, the PublishSubscribeChannel, that publishes messages sent to the message channel to all of its subscribers – more on this type of message channel later. - Two subscribers are created and both are subscribed to the subscribable message channel.
- A message is sent to the subscribable message channel.
- The both lists containing messages received by each of the subscribers are examined in order to assure that only one of the subscribers received the sent message.
Additional information for the curious:
The current implementation of DirectChannel uses a dispatcher of the type UnicastingDispatcher to dispatch messages to the subscribers of the message channel. This type of dispatcher dispatches messages sent to the message channel to one single subscriber only. The type of dispatcher used by a DirectChannel cannot be changed.
In addition, message dispatching is performed according to the RoundRobinLoadBalancingStrategy, if no other load-balancing strategy is specified when the instance of the DirectChannel is created.
Thus I could have chosen to verify that only the first subscriber received a message and the second subscriber received no messages, since this is what will be the result with the current implementation of DirectChannel. However this would rely on the message channel using a specific load-balancing strategy.
Unsubscribing
The last example illustrating common properties of subscribable message channels shows how a subscriber can be unsubscribed from a subscribable message channel. This does not apply to FluxMessageChannel message channels, since there is no unsunbscribe method in the FluxMessageChannel.
In addition, the PublishSubscribe message channel type will be used in the example so we will see that this type of message channel indeed publishes messages sent to the channel to all its subscribers.
/** * Tests creating a subscribable message channel that publishes * messages sent to the message channel to all subscribers * (a {@code PublishSubscribeChannel}) * and subscribing two subscribers to the channel. * Send a message to the message channel. * Unsubscribe one of the subscribers from the message channel. * Send another message to the message channel. * Expected result: * The first message should be received by both the subscribers. * The second message should be received only by the remaining * subscriber. */ @Test public void unsubscribeTest() { final SubscribableChannel theSubscribableChannel; final Message<String> theFirstInputMessage; final Message<String> theSecondInputMessage; final List<Message> theFirstSubscriberReceivedMessages = new CopyOnWriteArrayList<>(); final List<Message> theSecondSubscriberReceivedMessages = new CopyOnWriteArrayList<>(); /* * Create two subscribers (message handler) that adds each received * message to a list. */ final MessageHandler theFirstSubscriber = theFirstSubscriberReceivedMessages::add; final MessageHandler theSecondSubscriber = theSecondSubscriberReceivedMessages::add; /* Create two test messages. */ theFirstInputMessage = MessageBuilder .withPayload(GREETING_STRING + "1") .build(); theSecondInputMessage = MessageBuilder .withPayload(GREETING_STRING + "2") .build(); theSubscribableChannel = new PublishSubscribeChannel(); /* * Give the message channel a name so that it will * appear in any related log messages. */ ((AbstractSubscribableChannel) theSubscribableChannel) .setBeanName("MessageChannelToUnsubscribeFrom"); /* Register subscribers with the subscribable message channel. */ theSubscribableChannel.subscribe(theFirstSubscriber); theSubscribableChannel.subscribe(theSecondSubscriber); LOGGER.info("Number of subscribers before unsubscribe: " + ((AbstractSubscribableChannel) theSubscribableChannel) .getSubscriberCount()); /* Send the first message to the subscribable message channel. */ theSubscribableChannel.send(theFirstInputMessage); /* Unsubscribe the first subscriber from the message channel. */ theSubscribableChannel.unsubscribe(theFirstSubscriber); LOGGER.info("Number of subscribers after unsubscribe: " + ((AbstractSubscribableChannel) theSubscribableChannel) .getSubscriberCount()); /* Send the second message to the subscribable message channel. */ theSubscribableChannel.send(theSecondInputMessage); /* * Wait until at least one of the subscribers have received * two messages. */ await() .atMost(2, TimeUnit.SECONDS) .until(() -> (theFirstSubscriberReceivedMessages.size() == 2) || (theSecondSubscriberReceivedMessages.size() == 2)); LOGGER.info("First subscriber received messages: " + theFirstSubscriberReceivedMessages); LOGGER.info("Second subscriber received messages: " + theSecondSubscriberReceivedMessages); /* The first subscribers should have received one message. */ Assertions.assertEquals( 1, theFirstSubscriberReceivedMessages.size(), "First subscriber should have received one message"); /* The second subscriber should have received two messages. */ Assertions.assertEquals( 2, theSecondSubscriberReceivedMessages.size(), "Second subscriber should have received two messages"); }
Note that:
- Similar to what we have seen in the previous example, two subscribers that store received messages in lists are created.
- Two messages are created with different payloads.
- A PublishSubscribeChannel message channel is created and named.
The reason for using a PublishSubscribeChannel is that I wanted each message that is sent to the message channel to be published to all the subscribers subscribed to the message channel. This makes it easier to determine whether a subscriber was subscribed to the message channel or not when a certain message was sent. - Both the subscribers are subscribed to the message channel.
- The first test message is sent to the message channel.
- The first subscriber is unsubscribed from the message channel.
- The second test message is sent to the message channel.
- Wait until at least one of the subscribers have received two messages.
- Verify that the first subscriber has received one message and that the second subscriber has received two messages.
The reason for this is of course that the first subscriber only was subscribed to the message channel when the first message was sent and that the second subscriber was subscribed to the message channel both when the first and when the second test messages were sent to the message channel.
When running this test, the following console output can be observed:
2019-06-12 17:36:56.154 INFO 11344 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'MessageChannelToUnsubscribeFrom' has 1 subscriber(s). 2019-06-12 17:36:56.154 INFO 11344 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'MessageChannelToUnsubscribeFrom' has 2 subscriber(s). 2019-06-12 17:36:56.155 INFO 11344 --- [ main] s.i.s.m.s.SubscribableChannelsTests : Number of subscribers before unsubscribe: 2 2019-06-12 17:36:56.155 INFO 11344 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'MessageChannelToUnsubscribeFrom' has 1 subscriber(s). 2019-06-12 17:36:56.155 INFO 11344 --- [ main] s.i.s.m.s.SubscribableChannelsTests : Number of subscribers after unsubscribe: 1 2019-06-12 17:36:56.257 INFO 11344 --- [ main] s.i.s.m.s.SubscribableChannelsTests : First subscriber received messages: [GenericMessage [payload=Hello Integrated World!1, headers={id=b89bf288-9b3f-1d5a-249f-7a6f7040aad8, timestamp=1560332216154}]] 2019-06-12 17:36:56.258 INFO 11344 --- [ main] s.i.s.m.s.SubscribableChannelsTests : Second subscriber received messages: [GenericMessage [payload=Hello Integrated World!1, headers={id=b89bf288-9b3f-1d5a-249f-7a6f7040aad8, timestamp=1560332216154}], GenericMessage [payload=Hello Integrated World!2, headers={id=8b80f4d9-708d-5f2d-719e-95654336a087, timestamp=1560332216154}]]
We can see how the number of subscribers change as the two subscribers subscribe to the message channel and then as one of the subscribers unsubscribe. We also see that the first subscriber received the first message and that the second subscriber received both the first and the second messages.
The FluxMessageChannel, which is the only subscribable message channel that do not implement the SubscribableChannel interface, does not allow for unsubscribing.
This concludes this post on the common properties of subscribable message channels. In subsequent posts each of the subscribable message channels will be examined in detail.
Happy coding!