Schedule Reading of Multiple Files with Mule – Part Two

By | November 24, 2013

Finally I am back for part two of Scheduled Reading of Multiple Files with Mule. If you haven’t read part one, I recommend you to do so before reading this post.

Group Files Read at One Occasion

We have managed to schedule the reading of one or more files from an inbox directory, as above. To further extend the example program, we are going to group all the files read at one occasion so that the fileReaderFlow always produce one single Mule message. The second version of this example’s Mule configuration looks like this:

Second version of Mule configuration

Solution Outline

At the heart of the solution that will group the files read at one single occasion lies the Mule collection aggregator, which in itself is simple and straightforward. In order for the aggregator to work, we just need to set two special properties on each message to be aggregated; a correlation id and a correlation group size. In addition the collection aggregator can be configured to timeout after a certain amount of time and whether or not it is to fail when timing out. Since we cannot know beforehand how many files will be present in the inbox directory, the correlation group size will be set to a high number, well above the largest expected number of files. Due to the correlation group size being set to a high number, the collection aggregator will eventually time out. The timeout on the aggregator need to be set to a time smaller than the interval by which the reading of files is scheduled. Finally we need to configure the aggregator not to fail when timing out. What now remains is the correlation id, which is the tricky part of this example. We want the flow that schedules the reading of files to set a correlation id which later is used by the flow that reads files from the inbox directory. In Mule there is no easy way to convey information from one flow to another if the first flow does not send a message to the second flow. In order to accomplish this, a singleton Spring bean is made responsible for maintaining a correlation id and two transformers are created; one that retrieves the current correlation id and one that increments the correlation id.

Create Correlation Id Bean Class

As before, the correlation id Spring is to be responsible of maintaining a correlation id that can be incremented from one Mule flow and retrieved from another Mule flow. All manipulation of the correlation id is located in this class, in order to simplify the implementation of the transformers incrementing and retrieving the correlation id and to make access to the correlation id threadsafe.

  • In src/main/java of our example project, create the com.ivan.mule package.

  • In the new package, create a Java class named FileGroupCorrelationIdBean with the implementation supplied in the listing below:
package com.ivan.mule;

import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This class maintains a correlation id that can be incremented by one
 * flow and read by another.
 * In this particular example, the correlation id is used to correlate
 * a number of files read from a directory at one particular, scheduled,
 * point in time.
 * In order for this bean to function as expected, it must be a singleton
 * bean.
 * An instance of this class is threadsafe.
 * 
 * @author Ivan Krizsan
 */
public class FileGroupCorrelationIdBean {
    /* Constant(s): */
    /** Class logger. */
    private static final Logger LOGGER = LoggerFactory
        .getLogger(FileGroupCorrelationIdBean.class);

    /* Instance variable(s): */
    /** Correlation id. */
    private AtomicLong mCorrelationId = new AtomicLong(1L);

    /**
     * Retrieves the current correlation id.
     * 
     * @return Current correlation id.
     */
    public long getCorrelationId() {
        LOGGER.error("*** getCorrelationId: {}", mCorrelationId.get());
        return mCorrelationId.get();
    }

    /**
     * Increments the current correlation id.
     */
    public void incrementCorrelationId() {
        mCorrelationId.incrementAndGet();
        LOGGER.error("*** incrementCorrelationId, new value: {}",
            mCorrelationId.get());
    }
}

Note that:

  • The class FileGroupCorrelationIdBean does not inherit from any other class nor does it implement any interface.

  • There is a class variable named LOGGER. This logger object is used to log retrieval and incrementation of the correlation id in the methods of this class.

  • There is an instance variable named mCorrelationId which is of the type AtomicLong. This instance variable holds the current correlation id. An AtomicLong is used to make retrieval and manipulation of the correlation id threadsafe.

  • There is a method named getCorrelationId. This method retrieves the current correlation id and logs a message to the console.

  • There is a method named incrementCorrelationId. This method increments the correlation id and logs a message to the console.

  • Nothing in this class prevents creation of multiple instances of the class. The configuration of the Spring bean that we will see later will ensure that there is one single instance of this class in the Mule application.

Create the Increment Correlation Id Transformer

I have chosen to use implement a Mule transformer to increment the correlation id in order to use a standard Mule component type and to create a component that can be tested in a unit test.

  • In the package com.ivan.mule, create a Java class named IncrementCorrelationIdTransformer:

package com.ivan.mule;

import org.mule.api.MuleMessage;
import org.mule.api.transformer.TransformerException;
import org.mule.transformer.AbstractMessageTransformer;

/**
 * Transformer that increments the correlation id maintained by
 * bean injected into the transformer.
 * Messages pass through unmodified.
 * 
 * @author Ivan Krizsan
 */
public class IncrementCorrelationIdTransformer extends
    AbstractMessageTransformer {
    /** Singleton bean maintaining the correlation id. */
    private FileGroupCorrelationIdBean mCorrelationIdBean;

    @Override
    public Object transformMessage(final MuleMessage inMessage,
        final String inOutputEncoding) throws TransformerException {

        mCorrelationIdBean.incrementCorrelationId();
        return inMessage;
    }

    public FileGroupCorrelationIdBean getCorrelationIdBean() {
        return mCorrelationIdBean;
    }

    public void setCorrelationIdBean(
        final FileGroupCorrelationIdBean inCorrelationIdBean) {
        mCorrelationIdBean = inCorrelationIdBean;
    }
}

 

Note that:

  • The class IncrementCorrelationIdTransformer extends the Mule class AbstractMessageTransformer. Mule transformers inheriting from AbstractMessageTransformer will receive the current Mule message, not only the current message payload. This allows for manipulation of message properties, in addition to manipulation of the message payload. While this transformer does not manipulate neither message payload nor message properties, I usually choose this superclass for my transformers in case I need access to message properties later.

  • The class contains an instance variable named mCorrelationIdBean that is of the type FileGroupCorrelationIdBean. This instance variable will hold a reference to the single instance of the correlation id bean class created in the previous section.

  • There is a method named transformMessage. This method implements the abstract method in the superclass and performs the transformation work. In this example, we only invoke the incrementCorrelationId method on the correlation id bean and return the incoming message unaltered.

  • There is one getter- and one setter-method for the mCorrelationIdBean instance variable. The setter-method allows us to inject a reference to the correlation id bean using Spring configuration. The getter-method is not used in this example.

Create the Retrieve Correlation Id Transformer

With the same motivation as for the increment correlation id transformer, that is to use a standard Mule component type that can be unit-tested, we create a Mule transformer that retrieves the current correlation id.

  • In the package com.ivan.mule, create a Java class named RetrieveCorrelationIdTransformer implemented as in the listing below:

package com.ivan.mule;

import org.mule.api.MuleMessage;
import org.mule.api.transformer.TransformerException;
import org.mule.transformer.AbstractMessageTransformer;

/**
 * Transformer that retrieves the correlation id maintained by
 * bean injected into the transformer and stores it in a message property.
 * Any existing values in the message property will be overwritten.
 * Apart from this, messages passing through this transformer are unchanged.
 * 
 * @author Ivan Krizsan
 */
public class RetrieveCorrelationIdTransformer extends AbstractMessageTransformer {
    /* Constant(s): */
    /** Name of message property in which to store the correlation id. */
    private final static String CORRELATIONID_PROPERTYNAME = "CorrelationId";

    /* Instance variable(s): */
    /** Singleton bean maintaining the correlation id. */
    private FileGroupCorrelationIdBean mCorrelationIdBean;

    @Override
    public Object transformMessage(final MuleMessage inMessage,
        final String inOutputEncoding) throws TransformerException {

        /*
         * Our correlation id will be used in the Mule message correlation
         * id, which need to be a string, so we store our id as a string.
         */
        Long theCorrelationId = mCorrelationIdBean.getCorrelationId();
        inMessage.setInvocationProperty(CORRELATIONID_PROPERTYNAME,
            theCorrelationId.toString());

        return inMessage;
    }

    public FileGroupCorrelationIdBean getCorrelationIdBean() {
        return mCorrelationIdBean;
    }

    public void setCorrelationIdBean(
        final FileGroupCorrelationIdBean inCorrelationIdBean) {
        mCorrelationIdBean = inCorrelationIdBean;
    }
}

The notes regarding the increment correlation id transformer in the previous section also applies to this transformer. In addition, note that:

  • In the transformMessage method a correlation id is retrieved from the correlation id bean and the string representation of the value is stored in an invocation property on the Mule message. The reason for this is that the correlation aggregator we will use in a little while expects the correlation id to be a string.

Update the Mule Configuration File

With the correlation id bean and the two associated transformers in place, we are now ready to modify the Mule configuration file. The Mule configuration file is of course the file with the name ”quartztriggeredfileinboundendpoint.xml” in the src/main/app directory in our example project. Below, I list the entire updated version of the file with comments indicating the sections that have been added:

<?xml version="1.0" encoding="UTF-8"?>
<mule
    xmlns="http://www.mulesoft.org/schema/mule/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" 
    xmlns:quartz="http://www.mulesoft.org/schema/mule/quartz" 
    xmlns:file="http://www.mulesoft.org/schema/mule/file"
    xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:spring="http://www.springframework.org/schema/beans"
    xmlns:test="http://www.mulesoft.org/schema/mule/test"
    version="CE-3.4.0" 
    xsi:schemaLocation="http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/quartz http://www.mulesoft.org/schema/mule/quartz/current/mule-quartz.xsd
http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd">
    
    <!-- Start new section. -->
    <!--
        Singleton Spring bean that maintains a correlation id used when
        reading files.
    -->
    <spring:beans>
        <spring:bean
            name="fileGroupCorrelationIdBean"
            class="com.ivan.mule.FileGroupCorrelationIdBean"
            scope="singleton"/>
    </spring:beans>
    
    <!--
        Globally declared transformer that increments the correlation id
        maintained by the Spring bean above.
    -->
    <custom-transformer
        name="incrementCorrelationIdTransformer"
        class="com.ivan.mule.IncrementCorrelationIdTransformer">
        <spring:property name="correlationIdBean" ref="fileGroupCorrelationIdBean"/>
    </custom-transformer>
    
    <!--
        Globally declared transformer that retrieves the correlation id
        maintained by the Spring bean above.
    -->
    <custom-transformer
        name="retrieveCorrelationIdTransformer"
        class="com.ivan.mule.RetrieveCorrelationIdTransformer">
        <spring:property name="correlationIdBean" ref="fileGroupCorrelationIdBean"/>
    </custom-transformer>
    <!-- End new section. -->
    
    <!--
        Non-streaming file connector that deletes files after reading.
        No polling frequency is specified, since we want to read all
        available files at each occasion as fast as possible and
        there will be a Quartz job that schedules reading of the files.
        Note that this file connector will be started and stopped according
        to schedule, so it should not be used with other file inbound endpoints.
    -->
    <file:connector
        name="nonStreamingFileConnector"
        streaming="false"
        autoDelete="true"/>

    <!--
        Quartz scheduler connector that only allows for one single thread
        to run scheduled jobs.
        In my case I want to limit the number of threads that can be created,
        in case something goes wrong with the job.
        Depending on what is to be scheduled, the maximum number of threads
        to run jobs can be adjusted.
    -->
    <quartz:connector name="oneThreadQuartzConnector">
        <quartz:factory-property key="org.quartz.threadPool.threadCount" value="1"/>
    </quartz:connector>
    
    <!--
        This flow executes once when the Mule application starts up
        and stops the file connector that is to be used when periodically
        reading data from a directory.
    -->
    <flow name="stopFileConnectorFlow">
        <!--
            A one-time trigger that executes after 1 ms.
        -->
        <quartz:inbound-endpoint
            jobName="EventGeneratorJob1"
            connector-ref="oneThreadQuartzConnector"
            repeatCount="0"
            repeatInterval="10000"
            startDelay="1">
            <quartz:event-generator-job>
                <quartz:payload>Triggered!</quartz:payload>
            </quartz:event-generator-job>
        </quartz:inbound-endpoint>
        
        <!--
            Stop the file connector reading files if it is started.
            Groovy is used in order to make sure the connector to be stopped
            is not already stopped.
        -->
        <scripting:component>
            <scripting:script engine="groovy">
                <scripting:text><![CDATA[
                    def theFileConnector = registry.'nonStreamingFileConnector'
                    if (theFileConnector.isStarted()) {
                        theFileConnector.stop()
                    }
                ]]></scripting:text>
            </scripting:script>
        </scripting:component>
        
        <logger level="ERROR" message="*** Stopped file connector"/>
    </flow>

    <!--
        This flow schedules and trigger the reading of the file(s)
        in a directory.
        The flow is asynchronous so that it just triggers subsequent actions
        without waiting for a response.
    -->
    <flow name="quartzTriggerFlow">
        <!--
            After an initial 10 second delay, generates a message each 30
            seconds with specified payload. Infinite repitition.
            Uses the one-thread Quartz connector defined above.
            In this application, the message is used to trigger subsequent
            actions in the flow and the payload does not matter.
            If the flow is synchronous, the response timeout on the quartz
            inbound endpoint must be set to give the file reading flow enough
            time to read many files and/or large files.
        -->
        <quartz:inbound-endpoint
            jobName="EventGeneratorJob2"
            connector-ref="oneThreadQuartzConnector"
            repeatInterval="30000"
            repeatCount="-1"
            startDelay="10000">
            <quartz:event-generator-job>
                <quartz:payload>Triggered!</quartz:payload>
            </quartz:event-generator-job>
        </quartz:inbound-endpoint>
        
        <logger level="ERROR" message="*** Started reading files"/>
        
        <!-- Start new section. -->
        <!--
            Increment the correlation id maintained in the correlation id bean.
        -->
        <transformer ref="incrementCorrelationIdTransformer"/>
        <!-- End new section. -->

        <!--
            Start the file reading connector and wait some time to give it
            time to read and process files.
        -->
        <scripting:component>
            <scripting:script engine="groovy">
                <scripting:text><![CDATA[
                    def theFileConnector = registry.'nonStreamingFileConnector'
                    if (!theFileConnector.isStarted()) {
                        theFileConnector.start()
                    }
                    
                    java.lang.Thread.sleep(10000)
                ]]></scripting:text>
            </scripting:script>
        </scripting:component>
        
        <logger level="ERROR" message="*** About to stop the file connector"/>
        
        <!--
            Stop the file connector if it is started.
        -->
        <scripting:component>
            <scripting:script engine="groovy">
                <scripting:text><![CDATA[
                    def theFileConnector = registry.'nonStreamingFileConnector'
                    if (theFileConnector.isStarted()) {
                        theFileConnector.stop()
                    }
                ]]></scripting:text>
            </scripting:script>
        </scripting:component>
        
        <logger level="ERROR" message="*** Stopped reading files"/>
    </flow>
    
    <!--
        The file reader flow that is responsible for reading and processing 
        files from a designated directory.
        As an alternative to starting and stopping the file connector,
        this entire flow can be started and stopped. In such a case,
        set the initialState attribute on the flow to "stopped" and
        start and stop the flow similar to how the file connector
        is started and stopped.
    -->
    <flow name="fileReaderFlow">
        <file:inbound-endpoint
            path="inbox"
            connector-ref="nonStreamingFileConnector">
        </file:inbound-endpoint>
        
        <logger level="ERROR" message="*** Read a file"/>
        
        <!-- Start new section. -->
        <!--
            Retrieve the correlation id maintained in the correlation id bean.
        -->
        <transformer ref="retrieveCorrelationIdTransformer"/>
        
        <!--
            Setting a correlation id and correlation group size for the
            file aggregation.
            The correlation id is the one retrieved by the transformer above.
            The correlation group size need to be set above the maximum
            number of expected files at one single occasion, in order for all
            the files at one single occasion to be read and correlated into
            the same group.
        -->
        <set-property propertyName="MULE_CORRELATION_ID" 
            value="#[header:invocation:CorrelationId]"/>
        <set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="10000"/>
        
        <!--
            Aggregator that aggregates all the files read at one occasion
            into one and the same Mule message containing a list of files.
            The timeout need to be set high enough to allow for reading of
            the largest amout of files expected.
            The failOnTimeout attribute need to be set to false, in order
            for the aggregator not to throw an exception upon timeout.
        -->
        <collection-aggregator timeout="5000" failOnTimeout="false"/>
        
        <logger level="ERROR" message="*** Files have been aggregated"/>
        
        <logger level="ERROR"
          message="#[string:*** Received number of files: #[message.payload.size()]]"/>
        <!-- End new section. -->
    </flow>
</mule>

The following comments are only on the added sections of the Mule configuration file. The other parts remain unchanged and have been described earlier.

Note that:

  • In the first added section, a Spring bean named “fileGroupCorrelationIdBean” is configured. This implementation of this Spring bean is the correlation id bean class we implemented earlier. As discussed earlier, the scope of the bean is singleton, which is required in order for the two transformer defined below the bean to use one and the same instance of the bean.

  • Also in the first added section, there are two global Mule transformers defined. These are the two transformers which we implemented earlier; the first, with the name “incrementCorrelationIdTransformer”, increments the correlation id and the second, named “retrieveCorrelationIdTransformer”, retrieves the correlation id. The Spring bean defined above the transformers is injected into both the transformers.

  • In the second added section, located in the “quartzTriggerFlow” flow, a transformer has been added. The transformer references the global transformer “incrementCorrelationIdTransformer” defined earlier. As the name implies and as we have seen when implementing the transformer, this transformer increments the correlation id. Thus immediately before reading a new batch of files from the inbox directory, the correlation id is incremented in order for each batch of files to be grouped in a separate group.

  • The final added section in the “fileReaderFlow” flow is the largest addition and the remainder of the remarks concern this section.

  • A transformer has been added, which references the global transformer “retrieveCorrelationIdTransformer”. As we have seen earlier, this transformer retrieves the correlation id from the Spring bean defined earlier in the Mule configuration file. Each file that is read from the inbox directory will cause this transformer to be executed and a correlation id to be retrieved. While the correlation id is unchanged, that is not incremented by the “quartzTriggerFlow” flow, the correlation id retrieved by this transformer will have the same value. Recall that the correlation id is stored in a Mule message property named “CorrelationId” in the invocation scope by the transformer.

  • After having retrieved the correlation id, there are two property transformers. The first property transformer retrieves the correlation id from the Mule message property mentioned above and stores it in a Mule message property with the name “MULE_CORRELATION_ID”. This is the property that the collection aggregator we will see next uses to determine whether a message belongs to the current group of messages. The second property transformer sets the Mule message property named “MULE_CORRELATION_GROUP_SIZE” to a fixed value (10000). This property is used by the collection aggregator to determine the maximum number of messages that are to be grouped. More details about both these properties below!

  • The next addition is the collection aggregator which have already been mentioned several times. This Mule component is responsible for collecting a number of messages, up to the number specified by the “MULE_CORRELATION_GROUP_SIZE” Mule message property we have seen earlier. If the collection aggregator holds the maximum number of messages and another message arrives, a message containing the maximum number of messages specified by this property will be sent out and a new group of messages will be started. The aggregator also uses the “MULE_CORRELATION_ID” Mule message property to determine whether a message belongs to the current group of messages being aggregated or whether to start a new group of messages.

  • An alternative to setting the “MULE_CORRELATION_ID” and “MULE_CORRELATION_GROUP_SIZE “ message properties is to use the setCorrelationId and setCorrelationGroupSize methods on Mule messages.

  • The collection aggregator is able to maintain multiple collections of messages when aggregating messages. This means that a group of messages correlated on one correlation id will not automatically be sent out because a message with another correlation id arrives. Instead the aggregator will wait until either the correlation group size is reached or the message aggregation times out, whichever happens first.

  • There is a timeout attribute on the collection aggregator. This is a timeout in milliseconds after which the collection aggregator will stop aggregating messages. What happens when aggregation stops is determined by the failOnTimeout attribute.

  • There is a failOnTimeout attribute on the collection aggregator. When message aggregation times out and the failOnTimeout attribute is true, an exception is thrown and the messages aggregated up to that point in time are discarded. If the failOnTimeout attribute is false when aggregation times out, like in this example, the messages aggregated up to that point in time are sent out by the aggregator as if the aggregation has ended normally.

  • The logger component after the collection aggregator just logs the fact that a number of files have been aggregated. This component will not be invoked until the collection aggregator sends out a message.

  • The final logger component logs the number of files that were aggregated in the message sent out by the collection aggregator.

Run the Second Version of the Example Program

As when starting the first version of the example program, we right-click the example program project in the IDE and select Run As → Mule Application, a Mule instance should start up without errors. After some time, there should be log output in the console similar to this:

16:20:30,257 [[quartztriggeredfileinboundendpoint].stopFileConnectorFlow.stage1.02] ERROR *** Stopped file connector
16:20:40,058 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Started reading files
16:20:40,058 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** incrementCorrelationId, new value: 2
16:20:50,129 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** About to stop the file connector
16:20:50,129 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Stopped reading files
16:21:10,060 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Started reading files
16:21:10,060 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** incrementCorrelationId, new value: 3
16:21:20,071 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** About to stop the file connector
16:21:20,071 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Stopped reading files

The output is similar to that we saw when running the first version of the example program, with the exception of the correlation id being incremented.

Process One File

As in when running the first version of the example program, we are going to drop one single file into the inbox directory that appears in the example project in the IDE and observe the behaviour of our example program.

  • Wait until the last output in the console reads “Stopped reading files”.

  • Copy one file to the inbox directory.

Observe the console output. You should see output similar to the following in the console:

16:22:45,209 [[quartztriggeredfileinboundendpoint].stopFileConnectorFlow.stage1.02] ERROR *** Stopped file connector
16:22:55,020 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Started reading files
16:22:55,020 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** incrementCorrelationId, new value: 2
16:23:05,091 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** About to stop the file connector
16:23:05,091 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Stopped reading files
16:23:25,022 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Started reading files
16:23:25,022 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** incrementCorrelationId, new value: 3
16:23:26,043 [[quartztriggeredfileinboundendpoint].fileReaderFlow.stage1.02] ERROR *** Read a file
16:23:26,043 [[quartztriggeredfileinboundendpoint].fileReaderFlow.stage1.02] ERROR *** getCorrelationId: 3
16:23:31,054 [[quartztriggeredfileinboundendpoint].fileReaderFlow.event.correlator] ERROR *** Files have been aggregated
16:23:31,095 [[quartztriggeredfileinboundendpoint].fileReaderFlow.event.correlator] ERROR *** Received number of files: 1
16:23:35,025 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** About to stop the file connector
16:23:35,025 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Stopped reading files

Note that:

  • The correlation id is incremented at the start of each file-reading round. The corresponding console output is, for instance, “incrementCorrelationId, new value: 3”.

  • Each time a file is read, the current correlation id is retrieved. Corresponding console output example: “getCorrelationId: 3”

  • When the correlation aggregator times-out, the number of aggregated files are reported. Corresponding console output example: “Received number of files: 1”

Process Multiple Files

It seems like the new version of the scheduled file reading applications performs as expected. Before giving a final verdict, let’s try it with multiple files.

  • Clear the IDE console. Right-click anywhere in the console and select Clear.

  • Select a number of files. Remember the number of files selected, since we are going to verify that all files were indeed processed.

  • Wait until the last output in the console reads “Stopped reading files”.

  • Copy the files to the inbox directory.

  • Observe the console output. You should see something similar to this:

18:09:14,768 [[quartztriggeredfileinboundendpoint].stopFileConnectorFlow.stage1.02] ERROR *** Stopped file connector
18:09:24,566 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Started reading files
18:09:24,566 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** incrementCorrelationId, new value: 2
18:09:34,629 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** About to stop the file connector
18:09:34,629 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Stopped reading files
18:09:54,566 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Started reading files
18:09:54,566 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** incrementCorrelationId, new value: 3
18:09:55,581 [[quartztriggeredfileinboundendpoint].fileReaderFlow.stage1.02] ERROR *** Read a file
18:09:55,581 [[quartztriggeredfileinboundendpoint].fileReaderFlow.stage1.02] ERROR *** getCorrelationId: 3
18:09:55,597 [[quartztriggeredfileinboundendpoint].fileReaderFlow.stage1.03] ERROR *** Read a file
...
18:10:00,621 [[quartztriggeredfileinboundendpoint].fileReaderFlow.event.correlator] ERROR *** Files have been aggregated
18:10:00,669 [[quartztriggeredfileinboundendpoint].fileReaderFlow.event.correlator] ERROR *** Received number of files: 271
18:10:04,569 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** About to stop the file connector
18:10:04,569 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Stopped reading files

Note that:

  • There is a log entry “Read a file” for each file read. This is the same as with the first version of the program.

  • After each “Read a file”, the current correlation id is retrieved. Remember that in order for all the files read at the same scheduled occasion to be grouped together, the same correlation id need to be set for each file.

  • After all the files have been read, or actually, after the collection aggregator has timed out, there is a message saying “Files have been aggregated” logged.

  • Finally, the number of files read at this particular scheduled occasion, which are now contained in the current Mule message, is logged to the console.

Leave a Reply

Your email address will not be published. Required fields are marked *