File Splitter

The FileSplitter was added in version 4.1.2, and its namespace support was added in version 4.2. The FileSplitter splits text files into individual lines, based on BufferedReader.readLine(). By default, the splitter uses an Iterator to emit lines one at a time as they are read from the file. Setting the iterator property to false causes it to read all the lines into memory before emitting them as messages. One use case for this might be if you want to detect I/O errors on the file before sending any messages containing lines. However, it is only practical for relatively short files.

Inbound payloads can be File, String (a File path), InputStream, or Reader. Other payload types are emitted unchanged.

The following listing shows possible ways to configure a FileSplitter:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 The bean name of the splitter.
2 Set to true (the default) to use an iterator or false to load the file into memory before sending lines.
3 Set to true to emit start-of-file and end-of-file marker messages before and after the file data. Markers are messages with FileSplitter.FileMarker payloads (with START and END values in the mark property). You might use markers when sequentially processing files in a downstream flow where some lines are filtered. They enable the downstream processing to know when a file has been completely processed. In addition, a file_marker header that contains START or END is added to these messages. The END marker includes a line count. If the file is empty, only START and END markers are emitted with 0 as the lineCount. The default is false. When true, apply-sequence is false by default. See also markers-json (the next attribute).
4 When markers is true, set this to true to have the FileMarker objects be converted to a JSON string. (Uses a SimpleJsonSerializer underneath).
5 Set to false to disable the inclusion of sequenceSize and sequenceNumber headers in messages. The default is true, unless markers is true. When true and markers is true, the markers are included in the sequencing. When true and iterator is true, the sequenceSize header is set to 0, because the size is unknown.
6 Set to true to cause a RequiresReplyException to be thrown if there are no lines in the file. The default is false.
7 Set the charset name to be used when reading text data into String payloads. The default is the platform charset.
8 The header name for the first line to be carried as a header in the messages emitted for the remaining lines. Since version 5.0.
9 Set the input channel used to send messages to the splitter.
10 Set the output channel to which messages are sent.
11 Set the send timeout. Only applies if the output-channel can block — such as a full QueueChannel.
12 Set to false to disable automatically starting the splitter when the context is refreshed. The default is true.
13 Set the order of this endpoint if the input-channel is a <publish-subscribe-channel/>.
14 Set the startup phase for the splitter (used when auto-startup is true).

The FileSplitter also splits any text-based InputStream into lines. Starting with version 4.3, when used in conjunction with an FTP or SFTP streaming inbound channel adapter or an FTP or SFTP outbound gateway that uses the stream option to retrieve a file, the splitter automatically closes the session that supports the stream when the file is completely consumed See FTP Streaming Inbound Channel Adapter and SFTP Streaming Inbound Channel Adapter as well as FTP Outbound Gateway and SFTP Outbound Gateway for more information about these facilities.

When using Java configuration, an additional constructor is available, as the following example shows:

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

When markersJson is true, the markers are represented as a JSON string (using a SimpleJsonSerializer).

Version 5.0 introduced the firstLineAsHeader option to specify that the first line of content is a header (such as column names in a CSV file). The argument passed to this property is the header name under which the first line is carried as a header in the messages emitted for the remaining lines. This line is not included in the sequence header (if applySequence is true) nor in the lineCount associated with FileMarker.END. NOTE: Starting with version 5.5, the lineCount` is also included as a FileHeaders.LINE_COUNT into headers of the FileMarker.END message, since the FileMarker could be serialized into JSON. If a file contains only the header line, the file is treated as empty and, therefore, only FileMarker instances are emitted during splitting (if markers are enabled — otherwise, no messages are emitted). By default (if no header name is set), the first line is considered to be data and becomes the payload of the first emitted message.

If you need more complex logic about header extraction from the file content (not first line, not the whole content of the line, not one particular header, and so on), consider using header enricher ahead of the FileSplitter. Note that the lines that have been moved to the headers might be filtered downstream from the normal content process.

Idempotent Downstream Processing a Split File

When apply-sequence is true, the splitter adds the line number in the SEQUENCE_NUMBER header (when markers is true, the markers are counted as lines). The line number can be used with an Idempotent Receiver to avoid reprocessing lines after a restart.

For example:

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}