File Aggregator
Starting with version 5.5, a FileAggregator
is introduced to cover other side of FileSplitter
use-case when START/END markers are enabled.
For convenience the FileAggregator
implements all three sequence details strategies:
-
The
HeaderAttributeCorrelationStrategy
with theFileHeaders.FILENAME
attribute is used for correlation key calculation. When markers are enabled on theFileSplitter
, it does not populate sequence details headers, since START/END marker messages are also included into the sequence size. TheFileHeaders.FILENAME
is still populated for each line emitted, including START/END marker messages. -
The
FileMarkerReleaseStrategy
- checks forFileSplitter.FileMarker.Mark.END
message in the group and then compare aFileHeaders.LINE_COUNT
header value with the group size minus2
-FileSplitter.FileMarker
instances. It also implements a convenientGroupConditionProvider
contact forconditionSupplier
function to be used in theAbstractCorrelatingMessageHandler
. See Message Group Condition for more information. -
The
FileAggregatingMessageGroupProcessor
just removesFileSplitter.FileMarker
messages from the group and collect the rest of messages into a list payload to produce.
The following listing shows possible ways to configure a FileAggregator
:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
integrationFlow {
split(Files.splitter().markers().firstLineAsHeader("firstLine"))
channel { executor(taskExecutor) }
filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
transform(String::toUpperCase)
channel("aggregatorChannel")
aggregate(FileAggregator())
channel { queue("resultChannel") }
}
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new FileAggregator());
aggregator.setOutputChannel(outputChannel);
return aggregator;
}
<int:chain input-channel="input" output-channel="output">
<int-file:splitter markers="true"/>
<int:aggregator>
<bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
</int:aggregator>
</int:chain>
If default behavior of the FileAggregator
does not satisfy the target logic, it is recommended to configure an aggregator endpoint with individual strategies.
See FileAggregator
JavaDocs for more information.