| This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.5.3! | 
File Splitter
The FileSplitter was added in version 4.1.2, and its namespace support was added in version 4.2.
The FileSplitter splits text files into individual lines, based on BufferedReader.readLine().
By default, the splitter uses an Iterator to emit file lines one at a time as they are read from the file.
Setting the iterator property to false causes it to read all the lines into memory before emitting them as messages.
One use case for this might be if you want to detect I/O errors on the file before sending any messages containing lines.
However, it is only practical for relatively short files.
Inbound payloads can be File, String (a File path), InputStream, or Reader.
Other payload types are emitted unchanged.
The following listing shows possible ways to configure a FileSplitter:
- 
Java DSL 
- 
Kotlin DSL 
- 
Java 
- 
XML 
@SpringBootApplication
public class FileSplitterApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }
    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }
}@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)| 1 | The bean name of the splitter. | 
| 2 | Set to true(the default) to use an iterator orfalseto load the file into memory before sending lines. | 
| 3 | Set to trueto emit start-of-file and end-of-file marker messages before and after the file data.
Markers are messages withFileSplitter.FileMarkerpayloads (withSTARTandENDvalues in themarkproperty).
You might use markers when sequentially processing files in a downstream flow where some lines are filtered.
They enable the downstream processing to know when a file has been completely processed.
In addition, afile_markerheader that containsSTARTorENDis added to these messages.
TheENDmarker includes a line count.
If the file is empty, onlySTARTandENDmarkers are emitted with0as thelineCount.
The default isfalse.
Whentrue,apply-sequenceisfalseby default.
See alsomarkers-json(the next attribute). | 
| 4 | When markersis true, set this totrueto have theFileMarkerobjects be converted to a JSON string.
(Uses aSimpleJsonSerializerunderneath). | 
| 5 | Set to falseto disable the inclusion ofsequenceSizeandsequenceNumberheaders in messages.
The default istrue, unlessmarkersistrue.
Whentrueandmarkersistrue, the markers are included in the sequencing.
Whentrueanditeratoristrue, thesequenceSizeheader is set to0, because the size is unknown. | 
| 6 | Set to trueto cause aRequiresReplyExceptionto be thrown if there are no lines in the file.
The default isfalse. | 
| 7 | Set the charset name to be used when reading text data into Stringpayloads.
The default is the platform charset. | 
| 8 | The header name for the first line to be carried as a header in the messages emitted for the remaining lines. Since version 5.0. | 
| 9 | Set the input channel used to send messages to the splitter. | 
| 10 | Set the output channel to which messages are sent. | 
| 11 | Set the send timeout.
Only applies if the output-channelcan block — such as a fullQueueChannel. | 
| 12 | Set to falseto disable automatically starting the splitter when the context is refreshed.
The default istrue. | 
| 13 | Set the order of this endpoint if the input-channelis a<publish-subscribe-channel/>. | 
| 14 | Set the startup phase for the splitter (used when auto-startupistrue). | 
The FileSplitter also splits any text-based InputStream into lines.
Starting with version 4.3, when used in conjunction with an FTP or SFTP streaming inbound channel adapter or an FTP or SFTP outbound gateway that uses the stream option to retrieve a file, the splitter automatically closes the session that supports the stream when the file is completely consumed
See FTP Streaming Inbound Channel Adapter and SFTP Streaming Inbound Channel Adapter as well as FTP Outbound Gateway and SFTP Outbound Gateway for more information about these facilities.
When using Java configuration, an additional constructor is available, as the following example shows:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)When markersJson is true, the markers are represented as a JSON string (using a SimpleJsonSerializer).
Version 5.0 introduced the firstLineAsHeader option to specify that the first line of content is a header (such as column names in a CSV file).
The argument passed to this property is the header name under which the first line is carried as a header in the messages emitted for the remaining lines.
This line is not included in the sequence header (if applySequence is true) nor in the lineCount associated with FileMarker.END.
NOTE: Starting with version 5.5, the lineCount` is also included as a FileHeaders.LINE_COUNT into headers of the FileMarker.END message, since the FileMarker could be serialized into JSON.
If a file contains only the header line, the file is treated as empty and, therefore, only FileMarker instances are emitted during splitting (if markers are enabled — otherwise, no messages are emitted).
By default, (if no header name is set), the first line is considered to be data and becomes the payload of the first emitted message.
If you need more complex logic about header extraction from the file content (not first line, not the whole content of the line, not one particular header, and so on), consider using  header enricher ahead of the FileSplitter.
Note that the lines that have been moved to the headers might be filtered downstream from the normal content process.
Idempotent Downstream Processing a Split File
When apply-sequence is true, the splitter adds the line number in the SEQUENCE_NUMBER header (when markers is true, the markers are counted as lines).
The line number can be used with an Idempotent Receiver to avoid reprocessing lines after a restart.
For example:
@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}