Processing Inifinitely Large Messages with Mule

By | December 1, 2013
Reading Time: 9 minutes

I recently attended a very good talk by Angelika Langer about new features expected in Java 8. One one of the things that I found most interesting were streams, which add features allowing for parallel processing and lazy evaluation of a stream of objects.
I have, for some time, thought about processing large messages with Mule and I feel that streams in Java 8 will give me a tool to be able to do this.

In this example I will show my first experiment using Java 8 streams with Mule.

Prerequisites

To be able to develop with Java 8 and Mule you will need the following:

When having downloaded Eclipse and the Java 8 JDK, perform the following steps of preparation:

  • In Eclipse, add the Java 8 JDK to the list of installed JREs and make it the default JRE.
    Preferences -> Java -> Installed JREs
  • Install the MuleStudio plugin in Eclipse.
  • Install the Mule 3.4.0 community edition runtime in Eclipse.
  • Create a Mule project in Eclipse.
    Ensure that the new project uses the Java 8 JDK and the Mule CE runtime.

Disabling Mule JDK Validation

If we now were to try to start Mule on the Java 8 JDK, there would be an exception. Mule checks, among other things, which version of the JRE/JDK it is being run on to make sure that it is only being run on a supported version.
As pioneers we have to disable this check, since Java 8 is not supported yet:

  • In the example program project, create the org.mule.util package in src/main/java.
  • Locate the source-code for the org.mule.util.JdkVersionUtils class.
    Within Eclipse, use Open Type and enter the class name.
  • In the example project, create the class JdkVersionUtils in the package created earlier.
    Copy the source-code from step two to this class.
  • Modify the method getSupportedJdks in the JdkVersionUtils class in the example project so that it returns null.
  • In the project properties Java Build Path -> Order and Export, ensure that the Mule runtime is last, like shown in the following picture:

Java Build Path Eclipse Configuration

Mule Configuration

The Mule configuration for the example project looks like this:

Mule Stream Processing Flow

Notes:

  • I chose to have two flows to see whether it is possible to send a stream object from one flow to another.
  • I also experimented with sending a stream to more than one flow, which turned out to cause errors because of an attempt to consume the elements in the same stream more than once. The conclusion I have made this far is that the processing of Mule messages containing Java 8 streams must follow a path that is never split.
  • The first transformer in the first flow filters out integers which modulo 5 is not zero.
  • The second transformer in the first flow, which is the same as the first transformer in the second flow, multiplies each integer in a stream by 2.
  • The logger in the second flow logs the arrival of a Mule message.
    We will see that the contents of a stream is not processed at the same time as the Mule message carrying the stream through the Mule flow is processed. This since streams are evaluated lazily.
  • The final component in the second flow prints the integers in the stream.
    What is special about this component, as we will see, is that it does not consume the stream and is evaluated lazily.

The Mule configuration file XML looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<mule
    xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
    xmlns="http://www.mulesoft.org/schema/mule/core"
    xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:spring="http://www.springframework.org/schema/beans"
    version="CE-3.4.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-current.xsd
        
        http://www.mulesoft.org/schema/mule/core
        http://www.mulesoft.org/schema/mule/core/current/mule.xsd

        http://www.mulesoft.org/schema/mule/vm
        http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">

    <!-- Processing component that prints each number of a stream. -->
    <spring:beans>
        <spring:bean name="streamPrinter1" class="com.ivan.mule.StreamPrinter">
            <spring:property name="instanceId" value="1"/>
        </spring:bean>
    </spring:beans>
    
    <!-- Transformer that multiplies each integer in a stream by 2. -->
    <custom-transformer name="streamMultiply2" class="com.ivan.mule.StreamMultiplyTransformer">
        <spring:property name="multiplier" value="2"/>
    </custom-transformer>

    <flow name="FirstStreamProcessingFlow" doc:name="FirstStreamProcessingFlow">
        <vm:inbound-endpoint
            exchange-pattern="request-response" 
            path="streamProcessingService"
            doc:name="Stream Processing Endpoint"/>
        
        <custom-transformer
            class="com.ivan.mule.StreamFilterMod5sTransformer"
            doc:name="Filter Integers in Stream Which Mod 5 == 0"/>
        
        <transformer ref="streamMultiply2"
            doc:name="Multiply Integers in Stream by 2"/>

        <!-- Pass the stream on to another flow for further processing. -->
        <vm:outbound-endpoint
            exchange-pattern="request-response"
            path="otherStreamProcessingService"
            doc:name="Send to Other Flow"/>
    </flow>
    
    <flow name="SecondStreamProcessingFlow" doc:name="SecondStreamProcessingFlow">
        <vm:inbound-endpoint
            exchange-pattern="request-response" 
            doc:name="Other Stream Processing Endpoint"
            path="otherStreamProcessingService"/>
        
        <transformer
            ref="streamMultiply2"
            doc:name="Multiply Integers in Stream by 2"/>
        
        <logger
            message="Message reached logging component"
            level="ERROR" doc:name="Log Message Arrival"/>
        
        <!--
            Processing component that prints each integer in a stream.
        -->
        <component doc:name="Print Integers in Stream">
            <spring-object bean="streamPrinter1"/>
        </component>
    </flow>
</mule>

Log4J Configuration File

In order to be able to see the message logged by the logging component we need to supply a Log4J configuration file.

  • In src/main/resources, create a file named log4j.properties with the following contents:
log4j.rootLogger=WARN, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%d{MM-dd HH:mm:ss}] %-5p %c{1} [%t]: %m %n
log4j.logger.org.mule=INFO

Mule Starter Class

Since I want to send a stream to the above Mule flows, I needed a starter class. I also found that it was impossible to start Mule by right-clicking the configuration file and selecting Run As -> Mule Application, since our JDK validation hack is ignored. When starting Mule with a starter class, however, the JDK validation hack does take effect and I am able to run Mule 3.4.0 community edition on Java 8.

package com.ivan.mule;

import java.util.stream.Stream;

import org.mule.DefaultMuleMessage;
import org.mule.api.MuleContext;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.config.ConfigurationException;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.config.spring.SpringXmlConfigurationBuilder;
import org.mule.context.DefaultMuleContextFactory;
import org.mule.module.client.MuleClient;

/**
 * Reads Mule configuration file and starts an instance of Mule that is
 * configured accordingly. This starter is also needed in order for the patched
 * version of the JdkVersionUtils class to come into effect, which it does not
 * if the Mule configuration file is launched using Run As -> Mule Application.
 * 
 * @author Ivan A Krizsan
 */
public class MuleStarter {
    /** Mule configuration files. */
    public final static String[] MULE3_CONFIG_FILES = { "Java8StreamsProcessinInMule.xml" };
    /** Array of integers used when creating small test-stream. */
    public final static Integer[] SMALL_INT_ARRAY = { 1, 2, 3, 4, 5, 6, 7, 8,
        9, 10 };

    public static void main(final String[] args) throws Exception {

        MuleContext theMuleContext = startMule(MULE3_CONFIG_FILES);

        /*
         * Create the infinite stream that is to be the payload.
         * The first version creates a stream of integers of infinite size.
         * The second line creates a stream of 10 integers. For testing
         * purposes.
         */
        Stream<Integer> theInboundStream = Stream.generate(new MyIntegerSupplier());
        //Stream<Integer> theInboundStream = Arrays.asList(SMALL_INT_ARRAY).stream();

        MuleMessage theMuleMessage = new DefaultMuleMessage(theInboundStream,
            theMuleContext);

        MuleClient theMuleClient = new MuleClient(theMuleContext);

        /* Send the message with the stream payload to the Mule flow. */
        MuleMessage theResponse = theMuleClient.send(
            "vm://streamProcessingService", theMuleMessage);
        Stream<Integer> theOutboundStream = (Stream<Integer>) theResponse
            .getPayload();

        /*
         * Output the contents of the stream received in the response from
         * the Mule flow.
         */
        theOutboundStream.forEach(x -> System.out.println("Response contains: " + x));
    }

    /**
     * Starts an instance of Mule using the supplied configuration files.
     * 
     * @param inConfigFiles Mule configuration files.
     * @return Mule context.
     * @throws ConfigurationException If there is an error in the supplied
     * configuration.
     * @throws InitialisationException If an error occurred during initialization.
     * @throws MuleException If starting the instance failed.
     */
    private static MuleContext startMule(final String[] inConfigFiles)
        throws ConfigurationException, InitialisationException, MuleException {
        DefaultMuleContextFactory theMuleContextFactory = new DefaultMuleContextFactory();
        SpringXmlConfigurationBuilder theSpringConfigBuilder = new SpringXmlConfigurationBuilder(
            inConfigFiles);
        MuleContext theMuleContext = theMuleContextFactory
            .createMuleContext(theSpringConfigBuilder);
        theMuleContext.start();

        return theMuleContext;
    }
}

Integer Supplier

Since I wanted the stream of integers to be infinitely large, I use a supplier that the stream uses to generate its contents. In addition, I also wanted to log the generation of an integer that is put into a stream. The integer supplier is implemented like this:

package com.ivan.mule;

import java.util.function.Supplier;

/**
 * A supplier that generates an sequence of integer numbers.
 * Implemented in a class of its own since an instance of this supplied
 * holds state.
 * 
 * @author Ivan Krizsan
 */
public class MyIntegerSupplier implements Supplier<Integer> {
    private Integer mNextNumber = 1;

    /* (non-Javadoc)
     * @see java.util.function.Supplier#get()
     */
    @Override
    public Integer get() {
        System.out.println("MyIntGenerator::get: " + mNextNumber);
        return mNextNumber++;
    }
}

Mule Transformers and Component

Finally the three classes implementing the two custom transformers and the custom processing component are implemented as follows.

Stream Filter Modulo 5 Transformer

This Mule transformer filters out elements from the integer stream in a Mule message which modulo 5 is not zero.

package com.ivan.mule;

import java.util.stream.Stream;

import org.mule.api.MuleMessage;
import org.mule.api.transformer.TransformerException;
import org.mule.transformer.AbstractMessageTransformer;

/**
 * A Mule transformer that filters out all numbers in an integer stream
 * which modulo 5 is not zero.
 * 
 * @author Ivan Krizsan
 */
public class StreamFilterMod5sTransformer extends AbstractMessageTransformer {

    @Override
    public Object transformMessage(final MuleMessage inMuleMessage,
        final String inInputEncoding)
            throws TransformerException {
        System.out.println("StreamFilterMod5sTransformer entering");

        Stream<Integer> theIntStream = (Stream<Integer>) inMuleMessage.getPayload();
        theIntStream = theIntStream.filter(StreamFilterMod5sTransformer::filterMod5s);
        inMuleMessage.setPayload(theIntStream);

        System.out.println("StreamFilterMod5sTransformer exiting");
        return inMuleMessage;
    }

    /**
     * Filters the supplied integer depending on whether its modulo 5 is zero.
     * Helper method for the Stream.filter method.
     * Prints a message to the console when invoked in order to make it
     * visible when this method is (lazily) invoked.
     * 
     * @param inInt The integer to filter.
     * @return True if the supplied integer's modulo 5 is zero, false otherwise.
     */
    private static boolean filterMod5s(final int inInt) {
        System.out.println("StreamFilterMod5sTransformer::filterMod5s: " + inInt);
        return (inInt % 5) == 0;
    }
}

Stream Multiply Transformer

The next Mule transformer multiplies each integer in a stream with a configurable factor.

package com.ivan.mule;

import java.util.stream.Stream;

import org.mule.api.MuleMessage;
import org.mule.api.transformer.TransformerException;
import org.mule.transformer.AbstractMessageTransformer;

/**
 * Transforms all numbers in an integer stream multiplying each using a
 * configurable multiplier.
 * 
 * @author Ivan Krizsan
 */
public class StreamMultiplyTransformer extends AbstractMessageTransformer {

    private Integer mMultiplier = 1;

    @Override
    public Object transformMessage(final MuleMessage inMuleMessage,
        final String inInputEncoding)
            throws TransformerException {
        System.out.println("StreamMultiplyTransformer entering");

        Stream<Integer> theIntStream = (Stream<Integer>) inMuleMessage.getPayload();
        theIntStream = theIntStream.map(this::multiply);
        inMuleMessage.setPayload(theIntStream);

        System.out.println("StreamMultiplyTransformer exiting");
        return inMuleMessage;
    }

    /**
     * Multiplies the supplied integer by the multiplier.
     * Helper method for the Stream.map method.
     * Prints a message to the console when invoked in order to make it
     * visible when this method is (lazily) invoked.
     * 
     * @param inInt Integer to multiply.
     * @return Product of supplied integer and multiplier.
     */
    private int multiply(final int inInt) {
        System.out.println("StreamMultiplyTransformer::multiply : " + inInt);
        return inInt * mMultiplier;
    }

    public Integer getMultiplier() {
        return mMultiplier;
    }

    public void setMultiplier(final Integer inMultiplier) {
        mMultiplier = inMultiplier;
    }
}

Stream Printer Component

The stream printer component prints each integer in a stream without consuming the stream. Since the stream is not consumed and additional processing may be performed on the stream, the operation is also lazy as we will see when running the example program.

package com.ivan.mule;

import java.util.stream.Stream;

import org.mule.api.MuleEventContext;
import org.mule.api.MuleMessage;
import org.mule.api.lifecycle.Callable;

/**
 * A Mule message processor that prints all the numbers in an integer stream.
 * The stream passes through the processor unchanged.
 * 
 * @author Ivan Krizsan
 */
public class StreamPrinter implements Callable {
    private String mInstanceId = "";

    @Override
    public Object onCall(final MuleEventContext inMuleEventContext)
        throws Exception {
        MuleMessage theMuleMessage = inMuleEventContext.getMessage();
        System.out.println("StreamPrinter " + mInstanceId + " entering");

        Stream<Integer> theIntStream = (Stream<Integer>)
            theMuleMessage.getPayload();
        System.out.println("StreamPrinter " + mInstanceId + " received stream: "
            + theIntStream);
        /*
         * If peek is used on the stream, a lazy stream will be produced
         * as a result and the action performed on each element of the stream
         * will be deferred until the elements of the stream are consumed.
         * This means that the stream can be passed on for further
         * processing.
         * If forEach is performed on the stream, an action will be performed
         * immediately on each element of the stream and the stream will be
         * consumed. The stream cannot be passed on for further processing,
         * since forEach is a terminal operation on the stream.
         */
        theIntStream = theIntStream.peek(
            x ->
            System.out.println("StreamPrinter::println (" + mInstanceId + "): " + x));

        theMuleMessage.setPayload(theIntStream);

        System.out.println("StreamPrinter " + mInstanceId + " exiting");
        return theMuleMessage;
    }

    public String getInstanceId() {
        return mInstanceId;
    }

    public void setInstanceId(final String inInstanceId) {
        System.out.println("\nStreamPrinter set instance id: " + inInstanceId);
        mInstanceId = inInstanceId;
    }
}

Running the Example Program

We are now ready to run the example program by right-clicking the MuleStarter class in the Eclipse package or project explorer, selecting Run As -> Java Application.
There will be a lot of output to the console that will literally go on forever. In order to limit the console output, comment out line 40 in the MuleStarter class and remove the comment on the next line, line 41.

If we run the example program again, you will see the following output on the console:

StreamFilterMod5sTransformer entering
StreamFilterMod5sTransformer exiting
StreamMultiplyTransformer entering
StreamMultiplyTransformer exiting
StreamMultiplyTransformer entering
StreamMultiplyTransformer exiting
[12-01 18:36:55] ERROR LoggerMessageProcessor [main]: Message reached logging component
StreamPrinter 1 entering
StreamPrinter 1 received stream: java.util.stream.ReferencePipeline$3@3daa82be
StreamPrinter 1 exiting
StreamFilterMod5sTransformer::filterMod5s: 1
StreamFilterMod5sTransformer::filterMod5s: 2
StreamFilterMod5sTransformer::filterMod5s: 3
StreamFilterMod5sTransformer::filterMod5s: 4
StreamFilterMod5sTransformer::filterMod5s: 5
StreamMultiplyTransformer::multiply : 5
StreamMultiplyTransformer::multiply : 10
StreamPrinter::println (1): 20
Response contains: 20
StreamFilterMod5sTransformer::filterMod5s: 6
StreamFilterMod5sTransformer::filterMod5s: 7
StreamFilterMod5sTransformer::filterMod5s: 8
StreamFilterMod5sTransformer::filterMod5s: 9
StreamFilterMod5sTransformer::filterMod5s: 10
StreamMultiplyTransformer::multiply : 10
StreamMultiplyTransformer::multiply : 20
StreamPrinter::println (1): 40
Response contains: 4

Note that:

  • All the Mule transformers and components, including both the custom component we implemented earlier and the logger component, logs messages telling us that they have all received and finished processing the Mule message that the MuleStarter class sends. All without one single integer having been processed.
  • It is not until the MuleStarter class has received the response message and starts pulling integers out of the stream in the response message, on line 59, that the processing methods and lambdas start logging messages to the console telling us which integers are being processed.
  • We can see that one integer, for instance number 5, passes through all the processing steps of the stream before the next integer is processed in the stream.
  • Some integers are filtered away by the StreamFilterMod5sTransformer and thus never reach subsequent processing steps in the stream.
  • What we in effect do by sending a Mule message with a stream payload through our Mule configuration is to create a sequence of operations that are to be applied to the stream. The operations are not applied to the stream until we start pulling data out of it.

This first introductory example ends here, but I plant to return to the subject and apply the above technique to a more realistic problem.

Leave a Reply

Your email address will not be published. Required fields are marked *