For the latest stable version, please use Spring Integration 6.4.1!

File Aggregator

Starting with version 5.5, a FileAggregator is introduced to cover other side of FileSplitter use-case when START/END markers are enabled. For convenience the FileAggregator implements all three sequence details strategies:

  • The HeaderAttributeCorrelationStrategy with the FileHeaders.FILENAME attribute is used for correlation key calculation. When markers are enabled on the FileSplitter, it does not populate sequence details headers, since START/END marker messages are also included into the sequence size. The FileHeaders.FILENAME is still populated for each line emitted, including START/END marker messages.

  • The FileMarkerReleaseStrategy - checks for FileSplitter.FileMarker.Mark.END message in the group and then compare a FileHeaders.LINE_COUNT header value with the group size minus 2 - FileSplitter.FileMarker instances. It also implements a convenient GroupConditionProvider contact for conditionSupplier function to be used in the AbstractCorrelatingMessageHandler. See Message Group Condition for more information.

  • The FileAggregatingMessageGroupProcessor just removes FileSplitter.FileMarker messages from the group and collect the rest of messages into a list payload to produce.

The following listing shows possible ways to configure a FileAggregator:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
    return f -> f
            .split(Files.splitter()
                    .markers()
                    .firstLineAsHeader("firstLine"))
            .channel(c -> c.executor(taskExecutor))
            .filter(payload -> !(payload instanceof FileSplitter.FileMarker),
                    e -> e.discardChannel("aggregatorChannel"))
            .<String, String>transform(String::toUpperCase)
            .channel("aggregatorChannel")
            .aggregate(new FileAggregator())
            .channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
    integrationFlow {
        split(Files.splitter().markers().firstLineAsHeader("firstLine"))
        channel { executor(taskExecutor) }
        filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
        transform(String::toUpperCase)
        channel("aggregatorChannel")
        aggregate(FileAggregator())
        channel { queue("resultChannel") }
    }
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new FileAggregator());
    aggregator.setOutputChannel(outputChannel);
    return aggregator;
}
<int:chain input-channel="input" output-channel="output">
    <int-file:splitter markers="true"/>
    <int:aggregator>
        <bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
    </int:aggregator>
</int:chain>

If default behavior of the FileAggregator does not satisfy the target logic, it is recommended to configure an aggregator endpoint with individual strategies. See FileAggregator JavaDocs for more information.