Processing Inifinitely Large Messages with Mule

By | December 1, 2013

I recently attended a very good talk by Angelika Langer about new features expected in Java 8. One one of the things that I found most interesting were streams, which add features allowing for parallel processing and lazy evaluation of a stream of objects.
I have, for some time, thought about processing large messages with Mule and I feel that streams in Java 8 will give me a tool to be able to do this.

In this example I will show my first experiment using Java 8 streams with Mule.

Prerequisites

To be able to develop with Java 8 and Mule you will need the following:

When having downloaded Eclipse and the Java 8 JDK, perform the following steps of preparation:

  • In Eclipse, add the Java 8 JDK to the list of installed JREs and make it the default JRE.
    Preferences -> Java -> Installed JREs
  • Install the MuleStudio plugin in Eclipse.
  • Install the Mule 3.4.0 community edition runtime in Eclipse.
  • Create a Mule project in Eclipse.
    Ensure that the new project uses the Java 8 JDK and the Mule CE runtime.

Disabling Mule JDK Validation

If we now were to try to start Mule on the Java 8 JDK, there would be an exception. Mule checks, among other things, which version of the JRE/JDK it is being run on to make sure that it is only being run on a supported version.
As pioneers we have to disable this check, since Java 8 is not supported yet:

  • In the example program project, create the org.mule.util package in src/main/java.
  • Locate the source-code for the org.mule.util.JdkVersionUtils class.
    Within Eclipse, use Open Type and enter the class name.
  • In the example project, create the class JdkVersionUtils in the package created earlier.
    Copy the source-code from step two to this class.
  • Modify the method getSupportedJdks in the JdkVersionUtils class in the example project so that it returns null.
  • In the project properties Java Build Path -> Order and Export, ensure that the Mule runtime is last, like shown in the following picture:

Java Build Path Eclipse Configuration

Mule Configuration

The Mule configuration for the example project looks like this:

Mule Stream Processing Flow

Notes:

  • I chose to have two flows to see whether it is possible to send a stream object from one flow to another.
  • I also experimented with sending a stream to more than one flow, which turned out to cause errors because of an attempt to consume the elements in the same stream more than once. The conclusion I have made this far is that the processing of Mule messages containing Java 8 streams must follow a path that is never split.
  • The first transformer in the first flow filters out integers which modulo 5 is not zero.
  • The second transformer in the first flow, which is the same as the first transformer in the second flow, multiplies each integer in a stream by 2.
  • The logger in the second flow logs the arrival of a Mule message.
    We will see that the contents of a stream is not processed at the same time as the Mule message carrying the stream through the Mule flow is processed. This since streams are evaluated lazily.
  • The final component in the second flow prints the integers in the stream.
    What is special about this component, as we will see, is that it does not consume the stream and is evaluated lazily.

The Mule configuration file XML looks like this:

Log4J Configuration File

In order to be able to see the message logged by the logging component we need to supply a Log4J configuration file.

  • In src/main/resources, create a file named log4j.properties with the following contents:

Mule Starter Class

Since I want to send a stream to the above Mule flows, I needed a starter class. I also found that it was impossible to start Mule by right-clicking the configuration file and selecting Run As -> Mule Application, since our JDK validation hack is ignored. When starting Mule with a starter class, however, the JDK validation hack does take effect and I am able to run Mule 3.4.0 community edition on Java 8.

Integer Supplier

Since I wanted the stream of integers to be infinitely large, I use a supplier that the stream uses to generate its contents. In addition, I also wanted to log the generation of an integer that is put into a stream. The integer supplier is implemented like this:

Mule Transformers and Component

Finally the three classes implementing the two custom transformers and the custom processing component are implemented as follows.

Stream Filter Modulo 5 Transformer

This Mule transformer filters out elements from the integer stream in a Mule message which modulo 5 is not zero.

Stream Multiply Transformer

The next Mule transformer multiplies each integer in a stream with a configurable factor.

Stream Printer Component

The stream printer component prints each integer in a stream without consuming the stream. Since the stream is not consumed and additional processing may be performed on the stream, the operation is also lazy as we will see when running the example program.

Running the Example Program

We are now ready to run the example program by right-clicking the MuleStarter class in the Eclipse package or project explorer, selecting Run As -> Java Application.
There will be a lot of output to the console that will literally go on forever. In order to limit the console output, comment out line 40 in the MuleStarter class and remove the comment on the next line, line 41.

If we run the example program again, you will see the following output on the console:

Note that:

  • All the Mule transformers and components, including both the custom component we implemented earlier and the logger component, logs messages telling us that they have all received and finished processing the Mule message that the MuleStarter class sends. All without one single integer having been processed.
  • It is not until the MuleStarter class has received the response message and starts pulling integers out of the stream in the response message, on line 59, that the processing methods and lambdas start logging messages to the console telling us which integers are being processed.
  • We can see that one integer, for instance number 5, passes through all the processing steps of the stream before the next integer is processed in the stream.
  • Some integers are filtered away by the StreamFilterMod5sTransformer and thus never reach subsequent processing steps in the stream.
  • What we in effect do by sending a Mule message with a stream payload through our Mule configuration is to create a sequence of operations that are to be applied to the stream. The operations are not applied to the stream until we start pulling data out of it.

This first introductory example ends here, but I plant to return to the subject and apply the above technique to a more realistic problem.

Leave a Reply

Your email address will not be published. Required fields are marked *