Schedule Reading of Multiple Files with Mule – Part Two

By | November 24, 2013

Finally I am back for part two of Scheduled Reading of Multiple Files with Mule. If you haven’t read part one, I recommend you to do so before reading this post.

Group Files Read at One Occasion

We have managed to schedule the reading of one or more files from an inbox directory, as above. To further extend the example program, we are going to group all the files read at one occasion so that the fileReaderFlow always produce one single Mule message. The second version of this example’s Mule configuration looks like this:

Second version of Mule configuration

Solution Outline

At the heart of the solution that will group the files read at one single occasion lies the Mule collection aggregator, which in itself is simple and straightforward. In order for the aggregator to work, we just need to set two special properties on each message to be aggregated; a correlation id and a correlation group size. In addition the collection aggregator can be configured to timeout after a certain amount of time and whether or not it is to fail when timing out. Since we cannot know beforehand how many files will be present in the inbox directory, the correlation group size will be set to a high number, well above the largest expected number of files. Due to the correlation group size being set to a high number, the collection aggregator will eventually time out. The timeout on the aggregator need to be set to a time smaller than the interval by which the reading of files is scheduled. Finally we need to configure the aggregator not to fail when timing out. What now remains is the correlation id, which is the tricky part of this example. We want the flow that schedules the reading of files to set a correlation id which later is used by the flow that reads files from the inbox directory. In Mule there is no easy way to convey information from one flow to another if the first flow does not send a message to the second flow. In order to accomplish this, a singleton Spring bean is made responsible for maintaining a correlation id and two transformers are created; one that retrieves the current correlation id and one that increments the correlation id.

Create Correlation Id Bean Class

As before, the correlation id Spring is to be responsible of maintaining a correlation id that can be incremented from one Mule flow and retrieved from another Mule flow. All manipulation of the correlation id is located in this class, in order to simplify the implementation of the transformers incrementing and retrieving the correlation id and to make access to the correlation id threadsafe.

  • In src/main/java of our example project, create the com.ivan.mule package.

  • In the new package, create a Java class named FileGroupCorrelationIdBean with the implementation supplied in the listing below:

Note that:

  • The class FileGroupCorrelationIdBean does not inherit from any other class nor does it implement any interface.

  • There is a class variable named LOGGER. This logger object is used to log retrieval and incrementation of the correlation id in the methods of this class.

  • There is an instance variable named mCorrelationId which is of the type AtomicLong. This instance variable holds the current correlation id. An AtomicLong is used to make retrieval and manipulation of the correlation id threadsafe.

  • There is a method named getCorrelationId. This method retrieves the current correlation id and logs a message to the console.

  • There is a method named incrementCorrelationId. This method increments the correlation id and logs a message to the console.

  • Nothing in this class prevents creation of multiple instances of the class. The configuration of the Spring bean that we will see later will ensure that there is one single instance of this class in the Mule application.

Create the Increment Correlation Id Transformer

I have chosen to use implement a Mule transformer to increment the correlation id in order to use a standard Mule component type and to create a component that can be tested in a unit test.

  • In the package com.ivan.mule, create a Java class named IncrementCorrelationIdTransformer:

 

Note that:

  • The class IncrementCorrelationIdTransformer extends the Mule class AbstractMessageTransformer. Mule transformers inheriting from AbstractMessageTransformer will receive the current Mule message, not only the current message payload. This allows for manipulation of message properties, in addition to manipulation of the message payload. While this transformer does not manipulate neither message payload nor message properties, I usually choose this superclass for my transformers in case I need access to message properties later.

  • The class contains an instance variable named mCorrelationIdBean that is of the type FileGroupCorrelationIdBean. This instance variable will hold a reference to the single instance of the correlation id bean class created in the previous section.

  • There is a method named transformMessage. This method implements the abstract method in the superclass and performs the transformation work. In this example, we only invoke the incrementCorrelationId method on the correlation id bean and return the incoming message unaltered.

  • There is one getter- and one setter-method for the mCorrelationIdBean instance variable. The setter-method allows us to inject a reference to the correlation id bean using Spring configuration. The getter-method is not used in this example.

Create the Retrieve Correlation Id Transformer

With the same motivation as for the increment correlation id transformer, that is to use a standard Mule component type that can be unit-tested, we create a Mule transformer that retrieves the current correlation id.

  • In the package com.ivan.mule, create a Java class named RetrieveCorrelationIdTransformer implemented as in the listing below:

The notes regarding the increment correlation id transformer in the previous section also applies to this transformer. In addition, note that:

  • In the transformMessage method a correlation id is retrieved from the correlation id bean and the string representation of the value is stored in an invocation property on the Mule message. The reason for this is that the correlation aggregator we will use in a little while expects the correlation id to be a string.

Update the Mule Configuration File

With the correlation id bean and the two associated transformers in place, we are now ready to modify the Mule configuration file. The Mule configuration file is of course the file with the name ”quartztriggeredfileinboundendpoint.xml” in the src/main/app directory in our example project. Below, I list the entire updated version of the file with comments indicating the sections that have been added:

The following comments are only on the added sections of the Mule configuration file. The other parts remain unchanged and have been described earlier.

Note that:

  • In the first added section, a Spring bean named “fileGroupCorrelationIdBean” is configured. This implementation of this Spring bean is the correlation id bean class we implemented earlier. As discussed earlier, the scope of the bean is singleton, which is required in order for the two transformer defined below the bean to use one and the same instance of the bean.

  • Also in the first added section, there are two global Mule transformers defined. These are the two transformers which we implemented earlier; the first, with the name “incrementCorrelationIdTransformer”, increments the correlation id and the second, named “retrieveCorrelationIdTransformer”, retrieves the correlation id. The Spring bean defined above the transformers is injected into both the transformers.

  • In the second added section, located in the “quartzTriggerFlow” flow, a transformer has been added. The transformer references the global transformer “incrementCorrelationIdTransformer” defined earlier. As the name implies and as we have seen when implementing the transformer, this transformer increments the correlation id. Thus immediately before reading a new batch of files from the inbox directory, the correlation id is incremented in order for each batch of files to be grouped in a separate group.

  • The final added section in the “fileReaderFlow” flow is the largest addition and the remainder of the remarks concern this section.

  • A transformer has been added, which references the global transformer “retrieveCorrelationIdTransformer”. As we have seen earlier, this transformer retrieves the correlation id from the Spring bean defined earlier in the Mule configuration file. Each file that is read from the inbox directory will cause this transformer to be executed and a correlation id to be retrieved. While the correlation id is unchanged, that is not incremented by the “quartzTriggerFlow” flow, the correlation id retrieved by this transformer will have the same value. Recall that the correlation id is stored in a Mule message property named “CorrelationId” in the invocation scope by the transformer.

  • After having retrieved the correlation id, there are two property transformers. The first property transformer retrieves the correlation id from the Mule message property mentioned above and stores it in a Mule message property with the name “MULE_CORRELATION_ID”. This is the property that the collection aggregator we will see next uses to determine whether a message belongs to the current group of messages. The second property transformer sets the Mule message property named “MULE_CORRELATION_GROUP_SIZE” to a fixed value (10000). This property is used by the collection aggregator to determine the maximum number of messages that are to be grouped. More details about both these properties below!

  • The next addition is the collection aggregator which have already been mentioned several times. This Mule component is responsible for collecting a number of messages, up to the number specified by the “MULE_CORRELATION_GROUP_SIZE” Mule message property we have seen earlier. If the collection aggregator holds the maximum number of messages and another message arrives, a message containing the maximum number of messages specified by this property will be sent out and a new group of messages will be started. The aggregator also uses the “MULE_CORRELATION_ID” Mule message property to determine whether a message belongs to the current group of messages being aggregated or whether to start a new group of messages.

  • An alternative to setting the “MULE_CORRELATION_ID” and “MULE_CORRELATION_GROUP_SIZE “ message properties is to use the setCorrelationId and setCorrelationGroupSize methods on Mule messages.

  • The collection aggregator is able to maintain multiple collections of messages when aggregating messages. This means that a group of messages correlated on one correlation id will not automatically be sent out because a message with another correlation id arrives. Instead the aggregator will wait until either the correlation group size is reached or the message aggregation times out, whichever happens first.

  • There is a timeout attribute on the collection aggregator. This is a timeout in milliseconds after which the collection aggregator will stop aggregating messages. What happens when aggregation stops is determined by the failOnTimeout attribute.

  • There is a failOnTimeout attribute on the collection aggregator. When message aggregation times out and the failOnTimeout attribute is true, an exception is thrown and the messages aggregated up to that point in time are discarded. If the failOnTimeout attribute is false when aggregation times out, like in this example, the messages aggregated up to that point in time are sent out by the aggregator as if the aggregation has ended normally.

  • The logger component after the collection aggregator just logs the fact that a number of files have been aggregated. This component will not be invoked until the collection aggregator sends out a message.

  • The final logger component logs the number of files that were aggregated in the message sent out by the collection aggregator.

Run the Second Version of the Example Program

As when starting the first version of the example program, we right-click the example program project in the IDE and select Run As → Mule Application, a Mule instance should start up without errors. After some time, there should be log output in the console similar to this:

The output is similar to that we saw when running the first version of the example program, with the exception of the correlation id being incremented.

Process One File

As in when running the first version of the example program, we are going to drop one single file into the inbox directory that appears in the example project in the IDE and observe the behaviour of our example program.

  • Wait until the last output in the console reads “Stopped reading files”.

  • Copy one file to the inbox directory.

Observe the console output. You should see output similar to the following in the console:

Note that:

  • The correlation id is incremented at the start of each file-reading round. The corresponding console output is, for instance, “incrementCorrelationId, new value: 3”.

  • Each time a file is read, the current correlation id is retrieved. Corresponding console output example: “getCorrelationId: 3”

  • When the correlation aggregator times-out, the number of aggregated files are reported. Corresponding console output example: “Received number of files: 1”

Process Multiple Files

It seems like the new version of the scheduled file reading applications performs as expected. Before giving a final verdict, let’s try it with multiple files.

  • Clear the IDE console. Right-click anywhere in the console and select Clear.

  • Select a number of files. Remember the number of files selected, since we are going to verify that all files were indeed processed.

  • Wait until the last output in the console reads “Stopped reading files”.

  • Copy the files to the inbox directory.

  • Observe the console output. You should see something similar to this:

Note that:

  • There is a log entry “Read a file” for each file read. This is the same as with the first version of the program.

  • After each “Read a file”, the current correlation id is retrieved. Remember that in order for all the files read at the same scheduled occasion to be grouped together, the same correlation id need to be set for each file.

  • After all the files have been read, or actually, after the collection aggregator has timed out, there is a message saying “Files have been aggregated” logged.

  • Finally, the number of files read at this particular scheduled occasion, which are now contained in the current Mule message, is logged to the console.

Leave a Reply

Your email address will not be published. Required fields are marked *