Schedule Reading of Multiple Files with Mule – Part One

By | October 21, 2013

In this example we will look at how to schedule reading of all files available in a directory. The actions that can be scheduled are not limited to file reading; the technique used in this example allows us to schedule anything that Mule can do.
As an extension to the example, we will also look at how the files read at one occasion can be inserted into a list causing the result of each scheduled occasion to be one single Mule message containing a variable number of files.

The first version of this example’s Mule configuration looks like this:

Schedule reading of multiple files - Mule flows

Above figure: Graphical representation of this example’s Mule configuration, first version.

The Mule configuration is quite straight-forward, with three different flows:

  • stopFileConnectorFlow
    Sets the state of the file connector used to read files according to scheduled to stopped.

  • quartzTriggerFlow
    Schedules reading of files by starting and stopping the designated file connector.

  • fileReaderFlow
    Using the designated file connector, reads files and performs any processing of the read files. In this example we will just log a message saying that a file has been read.

Input Data

We will need a number of files to drop in the designated directory. I will use the about 250 XML files from the root of the Mule source but any kind of files will do. The size of the files will have some impact on the speed by which files are read and I encourage you to try files of different sizes. Big files and/or a large number of files may require adjusting the timing parameters in the example.

Create the Project

In your IDE, which is either some flavour of Eclipse with the MuleStudio plug-in installed or MuleStudio, create a new Mule project without any flow-files in it.

Create the Mule Configuration File

In the src/main/app directory in the Mule project, create a file named ”quartztriggeredfileinboundendpoint.xml” with the following contents:

<?xml version="1.0" encoding="UTF-8"?>
<mule
    xmlns="http://www.mulesoft.org/schema/mule/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" 
    xmlns:quartz="http://www.mulesoft.org/schema/mule/quartz" 
    xmlns:file="http://www.mulesoft.org/schema/mule/file"
    xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:spring="http://www.springframework.org/schema/beans"
    xmlns:test="http://www.mulesoft.org/schema/mule/test"
    version="CE-3.4.0" 
    xsi:schemaLocation="http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/quartz http://www.mulesoft.org/schema/mule/quartz/current/mule-quartz.xsd
http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd">
    
    <!--
        Non-streaming file connector that deletes files after reading.
        No polling frequency is specified, since we want to read all
        available files at each occasion as fast as possible and
        there will be a Quartz job that schedules reading of the files.
        Note that this file connector will be started and stopped according
        to schedule, so it should not be used with other file inbound endpoints.
    -->
    <file:connector
        name="nonStreamingFileConnector"
        streaming="false"
        autoDelete="true"/>

    <!--
        Quartz scheduler connector that only allows for one single thread
        to run scheduled jobs.
        In my case I want to limit the number of threads that can be created,
        in case something goes wrong with the job.
        Depending on what is to be scheduled, the maximum number of threads
        to run jobs can be adjusted.
    -->
    <quartz:connector name="oneThreadQuartzConnector">
        <quartz:factory-property key="org.quartz.threadPool.threadCount" value="1"/>
    </quartz:connector>
    
    <!--
        This flow executes once when the Mule application starts up
        and stops the file connector that is to be used when periodically
        reading data from a directory.
    -->
    <flow name="stopFileConnectorFlow">
        <!--
            A one-time trigger that executes after 1 ms.
        -->
        <quartz:inbound-endpoint
            jobName="EventGeneratorJob1"
            connector-ref="oneThreadQuartzConnector"
            repeatCount="0"
            repeatInterval="10000"
            startDelay="1">
            <quartz:event-generator-job>
                <quartz:payload>Triggered!</quartz:payload>
            </quartz:event-generator-job>
        </quartz:inbound-endpoint>
        
        <!--
            Stop the file connector reading files if it is started.
            Groovy is used in order to make sure the connector to be stopped
            is not already stopped.
        -->
        <scripting:component>
            <scripting:script engine="groovy">
                <scripting:text><![CDATA[
                    def theFileConnector = registry.'nonStreamingFileConnector'
                    if (theFileConnector.isStarted()) {
                        theFileConnector.stop()
                    }
                ]]></scripting:text>
            </scripting:script>
        </scripting:component>
        
        <logger level="ERROR" message="*** Stopped file connector"/>
    </flow>

    <!--
        This flow schedules and trigger the reading of the file(s)
        in a directory.
        The flow is asynchronous so that it just triggers subsequent actions
        without waiting for a response.
    -->
    <flow name="quartzTriggerFlow">
        <!--
            After an initial 10 second delay, generates a message each 30
            seconds with specified payload. Infinite repitition.
            Uses the one-thread Quartz connector defined above.
            In this application, the message is used to trigger subsequent
            actions in the flow and the payload does not matter.
            If the flow is synchronous, the response timeout on the quartz
            inbound endpoint must be set to give the file reading flow enough
            time to read many files and/or large files.
        -->
        <quartz:inbound-endpoint
            jobName="EventGeneratorJob2"
            connector-ref="oneThreadQuartzConnector"
            repeatInterval="30000"
            repeatCount="-1"
            startDelay="10000">
            <quartz:event-generator-job>
                <quartz:payload>Triggered!</quartz:payload>
            </quartz:event-generator-job>
        </quartz:inbound-endpoint>
        
        <logger level="ERROR" message="*** Started reading files"/>

        <!--
            Start the file reading connector and wait some time to give it
            time to read and process files.
        -->
        <scripting:component>
            <scripting:script engine="groovy">
                <scripting:text><![CDATA[
                    def theFileConnector = registry.'nonStreamingFileConnector'
                    if (!theFileConnector.isStarted()) {
                        theFileConnector.start()
                    }
                    
                    java.lang.Thread.sleep(10000)
                ]]></scripting:text>
            </scripting:script>
        </scripting:component>
        
        <logger level="ERROR" message="*** About to stop the file connector"/>
        
        <!--
            Stop the file connector if it is started.
        -->
        <scripting:component>
            <scripting:script engine="groovy">
                <scripting:text><![CDATA[
                    def theFileConnector = registry.'nonStreamingFileConnector'
                    if (theFileConnector.isStarted()) {
                        theFileConnector.stop()
                    }
                ]]></scripting:text>
            </scripting:script>
        </scripting:component>
        
        <logger level="ERROR" message="*** Stopped reading files"/>
    </flow>
    
    <!--
        The file reader flow that is responsible for reading and processing 
        files from a designated directory.
        As an alternative to starting and stopping the file connector,
        this entire flow can be started and stopped. In such a case,
        set the initialState attribute on the flow to "stopped" and
        start and stop the flow similar to how the file connector
        is started and stopped.
    -->
    <flow name="fileReaderFlow">
        <file:inbound-endpoint
            path="inbox"
            connector-ref="nonStreamingFileConnector">
        </file:inbound-endpoint>
        
        <logger level="ERROR" message="*** Read a file"/>
    </flow>
</mule>

Note that:

  • A non-streaming file connector named ”nonStreamingFileConnector” is used.
    We want to read the entire contents of all available files in the drop directory and delete them as fast as possible, in order to allow for new files to be copied into the directory.

  • There is no polling frequency specified on the file connector.
    Later in the flow there is a Quartz connector and a Quartz inbound endpoint which will be responsible for scheduling activation of the file inbound endpoint.

  • The file connector must not be used with other file inbound endpoints that not are to be scheduled.

  • The Quartz connector with the name ”oneThreadQuartzConnector” is configured to use one single thread.
    Only one single job at a time is expected to execute and this configuration is a precaution in case of problems occurring when performing the periodic actions. If something prevents the periodic action from completing and there are multiple Quartz threads allowed, Quartz will spawn a new thread for each occasion which may lead to extensive resource usage. It may also be more difficult to follow the actions of multiple simultaneous threads.

  • There are three flows in the Mule configuration file.
    Each on of these flows will be analyzed in separate sections below.

The stopFileConnectorFlow Flow

The first flow is named “stopFileConnectorFlow” and executes once to stop the file connector. This is due to the fact that we cannot specify the initial state of the file connector. The analyze of the second flow covers the stopping of the file connector that also appears in this flow and will not be repeated here.

  • The repeatCount attribute on the Quartz inbound endpoint in the stopFileConnectorFlow flow is set to zero.
    This causes the flow to be executed one single time. If the repeat count would have been one, the flow will be executed twice. A value of -1 will cause the trigger to repeat indefinitely.

  • The repeatInterval attribute is specified on the Quartz inbound endpoint in the stopFileConnectorFlow flow.
    Despite not being relevant to this trigger, since it is configured to execute only once, this attribute or the cronExpression attribute must be specified on Quartz inbound endpoints.

The quartzTriggerFlow Flow

The second flow is named “quartzTriggerFlow” and the flow that is responsible for triggering the periodic action, which in this example is reading of files.

  • The quartzTriggerFlow contains a Quartz inbound endpoint.
    This inbound endpoint controls the timing of the scheduled actions. In this example there will be an initial delay of 10 seconds, specified by the startDelay attribute, and repetition interval of 30 seconds, specified by the repeatInterval attribute. The number of repetitions is specified by the repeatCount attribute, which is set to -1 to yield infinite repetition.

  • The Quartz inbound endpoint contains an element with a child element.
    This construction causes a Mule message with the payload specified in the element to be sent according to the schedule specified in the Quartz inbound endpoint.
    In this example the message payload is not used and the string payload “Triggered!” is arbitrarily chosen.

  • The first scripting component in the quartzTriggerFlow starts the nonStreamingFileConnector file connector and waits for ten seconds.
    Depending on the expected number of files and/or the expected size of the files to be processed, the delay may need adjustment.
    The reason for using Groovy scripting is that I want to ensure that the file connector is only started if it not already started.

  • The second scripting component in the quartzTriggerFlow stops the nonStreamingFileConnector file connector in a way similar to the starting of the same file connector.

The fileReaderFlow Flow

The third flow is named “fileReaderFlow”. This flow contains the actions to be performed periodically.

  • The fileReaderFlow contains one file inbound endpoint.
    This is the file endpoint that will read the files according to the schedule discussed above. It uses the nonStreamingFileConnector that is defined earlier and that is controlled by the quartzTriggerFlow, as described in the previous section.

  • The flow contains a logger component that logs a message stating that a file was read.
    This is where one would want to insert any processing of an individual file read on a scheduled occasion.

Create the Log4J Configuration File

In order to limit the amount of logging and to print the name of the thread that output log, we use the following, rather minimalistic, Log4J configuration file:

  • In src/main/resource of the example project, create a file named “log4j.properties” with the following contents:

log4j.rootLogger=ERROR, out, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss,SSS} [%t] %p %m%n

Run the First Version of the Example Program

If we now right-click the example program project in the IDE and select Run As → Mule Application, a Mule instance should start up without errors. After some time, there should be log output in the console similar to this:

17:02:47,909 [[quartztriggeredfileinboundendpoint].stopFileConnectorFlow.stage1.02] ERROR *** Stopped file connector
17:02:57,754 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Started reading files
17:03:07,881 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** About to stop the file connector
17:03:07,896 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Stopped reading files

Note how the name of the flow from which a console message is logged appears somewhere before the “ERROR” string on the line in question.
We can see that the stopFileConnector flow set the state of the file connector to stopped, as expected. Ten seconds later, the quartzTriggerFlow starts the file connector and stops it again after another ten seconds have passed.

Process One File

We are going to drop one single file into the inbox directory that appears in the example project in the IDE. If the directory is not visible, please refresh the entire project before proceeding.

  • Wait until the last output in the console reads “Stopped reading files”.

  • Copy one file to the inbox directory.

  • Observe the console output.
    For this last round of file reading, you should see output similar to the following in the console:

...
17:29:31,494 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Stopped reading files
17:29:51,354 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Started reading files
17:29:52,437 [[quartztriggeredfileinboundendpoint].fileReaderFlow.stage1.02] ERROR *** Read a file
17:30:01,377 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** About to stop the file connector
17:30:01,377 [[quartztriggeredfileinboundendpoint].quartzTriggerFlow.stage1.02] ERROR *** Stopped reading files

We can see that soon after the quartzTriggerFlow announced “Started reading files”, the fileReaderFlow reported “Read a file”. After some time, the quartzTriggerFlow logged “Stopped reading files”.

Process Multiple Files

Instead of dropping just a single file into the inbox directory, we are now going to drop multiple files into the inbox directory.

  • Clear the IDE console.
    Right-click anywhere in the console and select Clear.

  • Select a number of files.
    Remember the number of files selected, since we are going to verify that all files were indeed processed.

  • Wait until the last output in the console reads “Stopped reading files”.

  • Copy the files to the inbox directory.

  • Observe the console output.

The console output will be similar to that when we dropped one single file in the inbox, but with multiple lines saying “Read a file”. In fact, the number of lines saying “Read a file” should match the number of files dropped in the inbox, unless the number of files were very large and/or the size of one or more of the files were large.

If the number of lines reading “Read a file” does not match the number of files dropped in the inbox, try experimenting with the timing constants in the quartzTriggerFlow and/or the fileReaderFlow flows.

Notes on Scheduling with Mule

In this example a connector is turned on and off in order to schedule the accepting of input to a flow. This has the advantage of allowing the flow to continue process any messages having entered the flow despite the connector having been turned off.
If there is no connector in the flow which actions are to be scheduled, there is also the option of starting and stopping the entire flow. This can be accomplished in a fashion similar to starting and stopping a connector. We do not need to stop the flow as the connector in this example is stopped by the flow stopFileConnectorFlow. Instead the attribute initialState of the flow which actions to be scheduled can be set to “stopped”.

To Be Continued…

In the next post, we’ll look at how to correlate all the files read at one single scheduled occasion so that they are all contained in one Mule message.

Leave a Reply

Your email address will not be published. Required fields are marked *