15. FTP/FTPS Adapters

Spring Integration provides support for file transfer operations via FTP and FTPS.

15.1 Introduction

The File Transfer Protocol (FTP) is a simple network protocol which allows you to transfer files between two computers on the Internet.

There are two actors when it comes to FTP communication: client and server. To transfer files with FTP/FTPS, you use a client which initiates a connection to a remote computer that is running an FTP server. After the connection is established, the client can choose to send and/or receive copies of files.

Spring Integration supports sending and receiving files over FTP/FTPS by providing three client side endpoints: Inbound Channel Adapter, Outbound Channel Adapter, and Outbound Gateway. It also provides convenient namespace-based configuration options for defining these client components.

To use the FTP namespace, add the following to the header of your XML file:

xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
    https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd"

15.2 FTP Session Factory

[Important]Important

Starting with version 3.0, sessions are no longer cached by default. See Section 15.8, “FTP Session Caching”.

Before configuring FTP adapters you must configure an FTP Session Factory. You can configure the FTP Session Factory with a regular bean definition where the implementation class is org.springframework.integration.ftp.session.DefaultFtpSessionFactory: Below is a basic configuration:

<bean id="ftpClientFactory"
    class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
    <property name="host" value="localhost"/>
    <property name="port" value="22"/>
    <property name="username" value="kermit"/>
    <property name="password" value="frog"/>
    <property name="clientMode" value="0"/>
    <property name="fileType" value="2"/>
    <property name="bufferSize" value="100000"/>
</bean>

For FTPS connections all you need to do is use org.springframework.integration.ftp.session.DefaultFtpsSessionFactory instead. Below is the complete configuration sample:

<bean id="ftpClientFactory"
    class="org.springframework.integration.ftp.client.DefaultFtpsClientFactory">
    <property name="host" value="localhost"/>
    <property name="port" value="22"/>
    <property name="username" value="oleg"/>
    <property name="password" value="password"/>
    <property name="clientMode" value="1"/>
    <property name="fileType" value="2"/>
    <property name="useClientMode" value="true"/>
    <property name="cipherSuites" value="a,b.c"/>
    <property name="keyManager" ref="keyManager"/>
    <property name="protocol" value="SSL"/>
    <property name="trustManager" ref="trustManager"/>
    <property name="prot" value="P"/>
    <property name="needClientAuth" value="true"/>
    <property name="authValue" value="oleg"/>
    <property name="sessionCreation" value="true"/>
    <property name="protocols" value="SSL, TLS"/>
    <property name="implicit" value="true"/>
</bean>

Every time an adapter requests a session object from its SessionFactory the session is returned from a session pool maintained by a caching wrapper around the factory. A Session in the session pool might go stale (if it has been disconnected by the server due to inactivity) so the SessionFactory will perform validation to make sure that it never returns a stale session to the adapter. If a stale session was encountered, it will be removed from the pool, and a new one will be created.

[Note]Note

If you experience connectivity problems and would like to trace Session creation as well as see which Sessions are polled you may enable it by setting the logger to TRACE level (e.g., log4j.category.org.springframework.integration.file=TRACE)

Now all you need to do is inject these session factories into your adapters. Obviously the protocol (FTP or FTPS) that an adapter will use depends on the type of session factory that has been injected into the adapter.

[Note]Note

A more practical way to provide values for FTP/FTPS Session Factories is by using Spring’s property placeholder support (See: https://docs.spring.io/spring/docs/current/spring-framework-reference/html/beans.html#beans-factory-placeholderconfigurer).

Advanced Configuration

DefaultFtpSessionFactory provides an abstraction over the underlying client API which, since Spring Integration 2.0, is Apache Commons Net. This spares you from the low level configuration details of the org.apache.commons.net.ftp.FTPClient. Several common properties are exposed on the session factory (since version 4.0, this now includes connectTimeout, defaultTimeout and dataTimeout). However there are times when access to lower level FTPClient configuration is necessary to achieve more advanced configuration (e.g., setting the port range for active mode etc.). For that purpose, AbstractFtpSessionFactory (the base class for all FTP Session Factories) exposes hooks, in the form of the two post-processing methods below.

/**
 * Will handle additional initialization after client.connect() method was invoked,
 * but before any action on the client has been taken
 */
protected void postProcessClientAfterConnect(T t) throws IOException {
    // NOOP
}
/**
 * Will handle additional initialization before client.connect() method was invoked.
 */
protected void postProcessClientBeforeConnect(T client) throws IOException {
    // NOOP
}

As you can see, there is no default implementation for these two methods. However, by extending DefaultFtpSessionFactory you can override these methods to provide more advanced configuration of the FTPClient. For example:

public class AdvancedFtpSessionFactory extends DefaultFtpSessionFactory {

    protected void postProcessClientBeforeConnect(FTPClient ftpClient) throws IOException {
       ftpClient.setActivePortRange(4000, 5000);
    }
}

15.3 Delegating Session Factory

Version 4.2 introduced the DelegatingSessionFactory which allows the selection of the actual session factory at runtime. Prior to invoking the ftp endpoint, call setThreadKey() on the factory to associate a key with the current thread. That key is then used to lookup the actual session factory to be used. The key can be cleared by calling clearThreadKey() after use.

Convenience methods have been added so this can easily be done from a message flow:

<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
    <constructor-arg>
        <bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
            <!-- delegate factories here -->
        </bean>
    </constructor-arg>
</bean>

<int:service-activator input-channel="in" output-channel="c1"
        expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />

<int-ftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />

<int:service-activator input-channel="c2" output-channel="out"
        expression="@dsf.clearThreadKey(#root)" />
[Important]Important

When using session caching (see Section 15.8, “FTP Session Caching”), each of the delegates should be cached; you cannot cache the DelegatingSessionFactory itself.

15.4 FTP Inbound Channel Adapter

The FTP Inbound Channel Adapter is a special listener that will connect to the FTP server and will listen for the remote directory events (e.g., new file created) at which point it will initiate a file transfer.

<int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    auto-create-local-directory="true"
    delete-remote-files="true"
    filename-pattern="*.txt"
    remote-directory="some/remote/path"
    remote-file-separator="/"
    preserve-timestamp="true"
    local-filename-generator-expression="#this.toUpperCase() + '.a'"
    local-filter="myFilter"
    temporary-file-suffix=".writing"
    local-directory=".">
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

As you can see from the configuration above you can configure an FTP Inbound Channel Adapter via the inbound-channel-adapter element while also providing values for various attributes such as local-directory, filename-pattern (which is based on simple pattern matching, not regular expressions), and of course the reference to a session-factory.

By default the transferred file will carry the same name as the original file. If you want to override this behavior you can set the local-filename-generator-expression attribute which allows you to provide a SpEL Expression to generate the name of the local file. Unlike outbound gateways and adapters where the root object of the SpEL Evaluation Context is a Message, this inbound adapter does not yet have the Message at the time of evaluation since that’s what it ultimately generates with the transferred file as its payload. So, the root object of the SpEL Evaluation Context is the original name of the remote file (String).

Starting with Spring Integration 3.0, you can specify the preserve-timestamp attribute (default false); when true, the local file’s modified timestamp will be set to the value retrieved from the server; otherwise it will be set to the current time.

Starting with version 4.2, you can specify remote-directory-expression instead of remote-directory, allowing you to dynamically determine the directory on each poll. e.g remote-directory-expression="@myBean.determineRemoteDir()".

Starting with version 4.3, the remote-directory/remote-directory-expression attributes can be omitted assuming null. In this case, according to the FTP protocol, the Client working directory is used as a default remote directory.

Sometimes file filtering based on the simple pattern specified via filename-pattern attribute might not be sufficient. If this is the case, you can use the filename-regex attribute to specify a Regular Expression (e.g. filename-regex=".*\.test$"). And of course if you need complete control you can use filter attribute and provide a reference to any custom implementation of the org.springframework.integration.file.filters.FileListFilter, a strategy interface for filtering a list of files. This filter determines which remote files are retrieved. You can also combine a pattern based filter with other filters, such as an AcceptOnceFileListFilter to avoid synchronizing files that have previously been fetched, by using a CompositeFileListFilter.

The AcceptOnceFileListFilter stores its state in memory. If you wish the state to survive a system restart, consider using the FtpPersistentAcceptOnceFileListFilter instead. This filter stores the accepted file names in an instance of the MetadataStore strategy (Section 9.5, “Metadata Store”). This filter matches on the filename and the remote modified time.

Since version 4.0, this filter requires a ConcurrentMetadataStore. When used with a shared data store (such as Redis with the RedisMetadataStore) this allows filter keys to be shared across multiple application or server instances.

The above discussion refers to filtering the files before retrieving them. Once the files have been retrieved, an additional filter is applied to the files on the file system. By default, this is an`AcceptOnceFileListFilter` which, as discussed, retains state in memory and does not consider the file’s modified time. Unless your application removes files after processing, the adapter will re-process the files on disk by default after an application restart.

Also, if you configure the filter to use a FtpPersistentAcceptOnceFileListFilter, and the remote file timestamp changes (causing it to be re-fetched), the default local filter will not allow this new file to be processed.

Use the local-filter attribute to configure the behavior of the local file system filter. Starting with verion 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter is configured by default. This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore strategy (Section 9.5, “Metadata Store”), and will detect changes to the local file modified time. The default MetadataStore is a SimpleMetadataStore which stores state in memory.

Since version 4.1.5, these filters have a new property flushOnUpdate which will cause them to flush the metadata store on every update (if the store implements Flushable).

[Important]Important

Further, if you use a distributed MetadataStore (such as Section 24.5, “Redis Metadata Store” or Section 16.7, “Gemfire Metadata Store”) you can have multiple instances of the same adapter/application and be sure that one and only one will process a file.

The actual local filter is a CompositeFileListFilter containing the supplied filter and a pattern filter that prevents processing files that are in the process of being downloaded (based on the temporary-file-suffix); files are downloaded with this suffix (default: .writing) and the file is renamed to its final name when the transfer is complete, making it visible to the filter.

The remote-file-separator attribute allows you to configure a file separator character to use if the default / is not applicable for your particular environment.

Please refer to the schema for more details on these attributes.

It is also important to understand that the FTP Inbound Channel Adapter is a Polling Consumer and therefore you must configure a poller (either via a global default or a local sub-element). Once a file has been transferred, a Message with a java.io.File as its payload will be generated and sent to the channel identified by the channel attribute.

More on File Filtering and Large Files

Sometimes the file that just appeared in the monitored (remote) directory is not complete. Typically such a file will be written with temporary extension (e.g., foo.txt.writing) and then renamed after the writing process finished. As a user in most cases you are only interested in files that are complete and would like to filter only files that are complete. To handle these scenarios you can use the filtering support provided by the filename-pattern, filename-regex and filter attributes. Here is an example that uses a custom Filter implementation.

<int-ftp:inbound-channel-adapter
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    filter="customFilter"
    local-directory="file:/my_transfers">
    remote-directory="some/remote/path"
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

<bean id="customFilter" class="org.example.CustomFilter"/>

Poller configuration notes for the inbound FTP adapter

The job of the inbound FTP adapter consists of two tasks: 1) Communicate with a remote server in order to transfer files from a remote directory to a local directory. 2) For each transferred file, generate a Message with that file as a payload and send it to the channel identified by the channel attribute. That is why they are called channel-adapters rather than just adapters. The main job of such an adapter is to generate a Message to be sent to a Message Channel. Essentially, the second task mentioned above takes precedence in such a way that IF your local directory already has one or more files it will first generate Messages from those, and ONLY when all local files have been processed, will it initiate the remote communication to retrieve more files.

Also, when configuring a trigger on the poller you should pay close attention to the max-messages-per-poll attribute. Its default value is 1 for all SourcePollingChannelAdapter instances (including FTP). This means that as soon as one file is processed, it will wait for the next execution time as determined by your trigger configuration. If you happened to have one or more files sitting in the local-directory, it would process those files before it would initiate communication with the remote FTP server. And, if the max-messages-per-poll were set to 1 (default), then it would be processing only one file at a time with intervals as defined by your trigger, essentially working as one-poll === one-file.

For typical file-transfer use cases, you most likely want the opposite behavior: to process all the files you can for each poll and only then wait for the next poll. If that is the case, set max-messages-per-poll to -1. Then, on each poll, the adapter will attempt to generate as many Messages as it possibly can. In other words, it will process everything in the local directory, and then it will connect to the remote directory to transfer everything that is available there to be processed locally. Only then is the poll operation considered complete, and the poller will wait for the next execution time.

You can alternatively set the max-messages-per-poll value to a positive value indicating the upward limit of Messages to be created from files with each poll. For example, a value of 10 means that on each poll it will attempt to process no more than 10 files.

15.4.1 Recovering from Failures

It is important to understand the architecture of the adapter. There is a file synchronizer which fetches the files, and a FileReadingMessageSource to emit a message for each synchronized file. As discussed above, there are two filters involved. The filter attribute (and patterns) refers to the remote (FTP) file list - to avoid fetching files that have already been fetched. The local-filter is used by the FileReadingMessageSource to determine which files are to be sent as messages.

The synchronizer lists the remote files and consults its filter; the files are then transferred. If an IO error occurs during file transfer, any files that have already been added to the filter are removed so they are eligible to be re-fetched on the next poll. This only applies if the filter implements ReversibleFileListFilter (such as the AcceptOnceFileListFilter).

If, after synchronizing the files, an error occurs on the downstream flow processing a file, there is no automatic rollback of the filter so the failed file will not be reprocessed by default.

If you wish to reprocess such files after a failure, you can use configuration similar to the following to facilitate the removal of the failed file from the filter. This will work for any ResettableFileListFilter.

<int-ftp:inbound-channel-adapter id="ftpAdapter"
        session-factory="ftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/sftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt"
        local-filter="acceptOnceFilter">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-ftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="@acceptOnceFilter.remove(payload)" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

15.4.2 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the inbound adapter using Java configuration:

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> ftpMessageSource() {
        FtpInboundFileSynchronizingMessageSource source =
                new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("ftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

15.4.3 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the inbound adapter using the Java DSL:

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow ftpInboundFlow() {
        return IntegrationFlows
            .from(s -> s.ftp(this.ftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilename(f -> f.toUpperCase() + ".a")
                    .localDirectory(new File("d:\\ftp_files")),
                e -> e.id("ftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

15.5 FTP Streaming Inbound Channel Adapter

The streaming inbound channel adapter was introduced in version 4.3. This adapter produces message with payloads of type InputStream, allowing files to be fetched without writing to the local file system. Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed. The session is provided in the IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE header. Standard framework components, such as the FileSplitter and StreamTransformer will automatically close the session. See Section 14.5, “File Splitter” and the section called “Stream Transformer” for more information about these components.

<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            remote-file-separator="/"
            comparator="comparator"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>

Only one of filename-pattern, filename-regex or filter is allowed.

[Important]Important

Unlike the non-streaming inbound channel adapter, this adapter does not prevent duplicates by default. If you do not delete the remote file (e.g. using an outbound gateway with an rm command) and you wish to prevent the file being processed again, you can configure an FtpPersistentFileListFilter in the filter attribute. If you don’t actually want to persist the state, an in-memory SimpleMetadataStore can be used with the filter. If you wish to use a filename pattern (or regex) as well, use a CompositeFileListFilter.

15.5.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the inbound adapter using Java configuration:

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template(), null);
        messageSource.setRemoteDirectory("ftpSource/");
        messageSource.setFilter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
                           "streaming"));
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer();
    }

    @Bean
    public FtpRemoteFileTemplate template() {
        return new FtpRemoteFileTemplate(ftpSessionFactory());
    }

}

15.6 FTP Outbound Channel Adapter

The FTP Outbound Channel Adapter relies upon a MessageHandler implementation that will connect to the FTP server and initiate an FTP transfer for every file it receives in the payload of incoming Messages. It also supports several representations of the File so you are not limited only to java.io.File typed payloads. The FTP Outbound Channel Adapter supports the following payloads: 1) java.io.File - the actual file object; 2) byte[] - a byte array that represents the file contents; and 3) java.lang.String - text that represents the file contents.

<int-ftp:outbound-channel-adapter id="ftpOutbound"
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    charset="UTF-8"
    remote-file-separator="/"
    auto-create-directory="true"
    remote-directory-expression="headers.['remote_dir']"
    temporary-remote-directory-expression="headers.['temp_remote_dir']"
    filename-generator="fileNameGenerator"
    use-temporary-filename="true"
    mode="REPLACE"/>

As you can see from the configuration above you can configure an FTP Outbound Channel Adapter via the outbound-channel-adapter element while also providing values for various attributes such as filename-generator (an implementation of the org.springframework.integration.file.FileNameGenerator strategy interface), a reference to a session-factory, as well as other attributes. You can also see some examples of *expression attributes which allow you to use SpEL to configure things like remote-directory-expression, temporary-remote-directory-expression and remote-filename-generator-expression (a SpEL alternative to filename-generator shown above). As with any component that allows the usage of SpEL, access to Payload and Message Headers is available via payload and headers variables. Please refer to the schema for more details on the available attributes.

[Note]Note

By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name based on the value of the file_name header (if it exists) in the MessageHeaders, or if the payload of the Message is already a java.io.File, then it will use the original name of that file.

[Important]Important

Defining certain values (e.g., remote-directory) might be platform/ftp server dependent. For example as it was reported on this forum https://forum.spring.io/showthread.php?p=333478&posted=1#post333478 on some platforms you must add slash to the end of the directory definition (e.g., remote-directory="/foo/bar/" instead of remote-directory="/foo/bar")

Starting with version 4.1, you can specify the mode when transferring the file. By default, an existing file will be overwritten; the modes are defined on enum FileExistsMode, having values REPLACE (default), APPEND, IGNORE, and FAIL. With IGNORE and FAIL, the file is not transferred; FAIL causes an exception to be thrown whereas IGNORE silently ignores the transfer (although a DEBUG log entry is produced).

Avoiding Partially Written Files

One of the common problems, when dealing with file transfers, is the possibility of processing a partial file - a file might appear in the file system before its transfer is actually complete.

To deal with this issue, Spring Integration FTP adapters use a very common algorithm where files are transferred under a temporary name and then renamed once they are fully transferred.

By default, every file that is in the process of being transferred will appear in the file system with an additional suffix which, by default, is .writing; this can be changed using the temporary-file-suffix attribute.

However, there may be situations where you don’t want to use this technique (for example, if the server does not permit renaming files). For situations like this, you can disable this feature by setting use-temporary-file-name to false (default is true). When this attribute is false, the file is written with its final name and the consuming application will need some other mechanism to detect that the file is completely uploaded before accessing it.

15.6.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the Outbound Adapter using Java configuration:

@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                    new SpringApplicationBuilder(FtpJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToFtp(new File("/foo/bar.txt"));
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        FtpMessageHandler handler = new FtpMessageHandler(ftpSessionFactory());
        handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
        handler.setFileNameGenerator(new FileNameGenerator() {

            @Override
            public String generateFileName(Message<?> message) {
                 return "handlerContent.test";
            }

        });
        return handler;
    }

    @MessagingGateway
    public interface MyGateway {

         @Gateway(requestChannel = "toFtpChannel")
         void sendToFtp(File file);

    }
}

15.6.2 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the Outbound Adapter using the Java DSL:

@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
            new SpringApplicationBuilder(FtpJavaApplication.class)
                .web(false)
                .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToFtp(new File("/foo/bar.txt"));
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public IntegrationFlow ftpOutboundFlow() {
        return IntegrationFlows.from("toFtpChannel")
                .handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
                        .useTemporaryFileName(false)
                        .fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
                        .remoteDirectory(this.ftpServer.getTargetFtpDirectory().getName())
                ).get();
    }

    @MessagingGateway
    public interface MyGateway {

         @Gateway(requestChannel = "toFtpChannel")
         void sendToFtp(File file);

    }

}

15.7 FTP Outbound Gateway

The FTP Outbound Gateway provides a limited set of commands to interact with a remote FTP/FTPS server. Commands supported are:

  • ls (list files)
  • get (retrieve file)
  • mget (retrieve file(s))
  • rm (remove file(s))
  • mv (move/rename file)
  • put (send file)
  • mput (send multiple files)

ls

ls lists remote file(s) and supports the following options:

  • -1 - just retrieve a list of file names, default is to retrieve a list of FileInfo objects.
  • -a - include all files (including those starting with .)
  • -f - do not sort the list
  • -dirs - include directories (excluded by default)
  • -links - include symbolic links (excluded by default)
  • -R - list the remote directory recursively

In addition, filename filtering is provided, in the same manner as the inbound-channel-adapter.

The message payload resulting from an ls operation is a list of file names, or a list of FileInfo objects. These objects provide information such as modified time, permissions etc.

The remote directory that the ls command acted on is provided in the file_remoteDirectory header.

When using the recursive option (-R), the fileName includes any subdirectory elements, representing a relative path to the file (relative to the remote directory). If the -dirs option is included, each recursive directory is also returned as an element in the list. In this case, it is recommended that the -1 is not used because you would not be able to determine files Vs. directories, which is achievable using the FileInfo objects.

Starting with version 4.3, the FtpSession supports null for the list() and listNames() methods, therefore the expression attribute can be omitted. For Java configuration, there are two constructors without an expression argument for convenience. null for LS, PUT and MPUT commands is treated as the Client working directory according to the FTP protocol. All other commands must be supplied with the expression to evaluate remote path against request message. The working directory can be set via the FTPClient.changeWorkingDirectory() function when you extend the DefaultFtpSessionFactory and implement postProcessClientAfterConnect() callback.

get

get retrieves a remote file and supports the following option:

  • -P - preserve the timestamp of the remote file
  • -stream - retrieve the remote file as a stream.

The remote directory is provided in the file_remoteDirectory header, and the filename is provided in the file_remoteFile header.

The message payload resulting from a get operation is a File object representing the retrieved file, or an InputStream when the -stream option is provided. This option allows retrieving the file as a stream. For text files, a common use case is to combine this operation with a File Splitter or Stream Transformer. When consuming remote files as streams, the user is responsible for closing the Session after the stream is consumed. For convenience, the Session is provided in the IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE header, a convenience method is provided on the IntegrationMessageHeaderAccessor:

Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
    closeable.close();
}

Note: In previous releases the session was in the file_remoteSession header, but this is deprecated - use IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE instead.

Framework components such as the File Splitter and Stream Transformer will automatically close the session after the data is transferred.

The following shows an example of consuming a file as a stream:

<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
                            request-channel="inboundGetStream"
                            command="get"
                            command-options="-stream"
                            expression="payload"
                            remote-directory="ftpTarget"
                            reply-channel="stream" />

<int-file:splitter input-channel="stream" output-channel="lines" />

Note: if you consume the input stream in a custom component, you must close the Session. You can either do that in your custom code, or route a copy of the message to a service-activator and use SpEL:

<int:service-activator input-channel="closeSession"
    expression="headers[T(org.springframework.integration.IntegrationMessageHeaderAccessor).CLOSEABLE_RESOURCE].close()" />

mget

mget retrieves multiple remote files based on a pattern and supports the following option:

  • -P - preserve the timestamps of the remote files
  • -x - Throw an exception if no files match the pattern (otherwise an empty list is returned)

The message payload resulting from an mget operation is a List<File> object - a List of File objects, each representing a retrieved file.

The remote directory is provided in the file_remoteDirectory header, and the pattern for the file names is provided in the file_remoteFile header.

[Note]Notes for when using recursion (-R)

The pattern is ignored, and * is assumed. By default, the entire remote tree is retrieved. However, files in the tree can be filtered, by providing a`FileListFilter`; directories in the tree can also be filtered this way. A FileListFilter can be provided by reference or by filename-pattern or filename-regex attributes. For example, filename-regex="(subDir|.*1.txt)" will retrieve all files ending with 1.txt in the remote directory and the subdirectory subDir. If a subdirectory is filtered, no additional traversal of that subdirectory is performed.

The -dirs option is not allowed (the recursive mget uses the recursive ls to obtain the directory tree and the directories themselves cannot be included in the list).

Typically, you would use the #remoteDirectory variable in the local-directory-expression so that the remote directory structure is retained locally.

See also Section 15.7.3, “Outbound Gateway Partial Success (mget and mput)”.

put

put sends a file to the remote server; the payload of the message can be a java.io.File, a byte[] or a String. A remote-filename-generator (or expression) is used to name the remote file. Other available attributes include remote-directory, temporary-remote-directory (and their *-expression) equivalents, use-temporary-file-name, and auto-create-directory. Refer to the schema documentation for more information.

The message payload resulting from a put operation is a String representing the full path of the file on the server after transfer.

mput

mput sends multiple files to the server and supports the following option:

  • -R - Recursive - send all files (possibly filtered) in the directory and subdirectories

The message payload must be a java.io.File representing a local directory.

The same attributes as the put command are supported. In addition, files in the local directory can be filtered with one of mput-pattern, mput-regex or mput-filter. The filter works with recursion, as long as the subdirectories themselves pass the filter. Subdirectories that do not pass the filter are not recursed.

The message payload resulting from an mget operation is a List<String> object - a List of remote file paths resulting from the transfer.

See also Section 15.7.3, “Outbound Gateway Partial Success (mget and mput)”.

rm

The rm command has no options.

The message payload resulting from an rm operation is Boolean.TRUE if the remove was successful, Boolean.FALSE otherwise. The remote directory is provided in the file_remoteDirectory header, and the filename is provided in the file_remoteFile header.

mv

The mv command has no options.

The expression attribute defines the "from" path and the rename-expression attribute defines the "to" path. By default, the rename-expression is headers['file_renameTo']. This expression must not evaluate to null, or an empty String. If necessary, any remote directories needed will be created. The payload of the result message is Boolean.TRUE. The original remote directory is provided in the file_remoteDirectory header, and the filename is provided in the file_remoteFile header. The new path is in the file_renameTo header.

Additional Information

The get and mget commands support the local-filename-generator-expression attribute. It defines a SpEL expression to generate the name of local file(s) during the transfer. The root object of the evaluation context is the request Message but, in addition, the remoteFileName variable is also available, which is particularly useful for mget, for example: local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.foo".

The get and mget commands support the local-directory-expression attribute. It defines a SpEL expression to generate the name of local directory(ies) during the transfer. The root object of the evaluation context is the request Message but, in addition, the remoteDirectory variable is also available, which is particularly useful for mget, for example: local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.foo". This attribute is mutually exclusive with local-directory attribute.

For all commands, the PATH that the command acts on is provided by the expression property of the gateway. For the mget command, the expression might evaluate to , meaning retrieve all files, or somedirectory/ etc.

Here is an example of a gateway configured for an ls command…​

<int-ftp:outbound-gateway id="gateway1"
    session-factory="ftpSessionFactory"
    request-channel="inbound1"
    command="ls"
    command-options="-1"
    expression="payload"
    reply-channel="toSplitter"/>

The payload of the message sent to the toSplitter channel is a list of String objects containing the filename of each file. If the command-options was omitted, it would be a list of FileInfo objects. Options are provided space-delimited, e.g. command-options="-1 -dirs -links".

Starting with version 4.2, the GET, MGET, PUT and MPUT commands support a FileExistsMode property (mode when using the namespace support). This affects the behavior when the local file exists (GET and MGET) or the remote file exists (PUT and MPUT). Supported modes are REPLACE, APPEND, FAIL and IGNORE. For backwards compatibility, the default mode for PUT and MPUT operations is REPLACE and for GET and MGET operations, the default is FAIL.

15.7.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the Outbound Gateway using Java configuration:

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        FtpOutboundGateway ftpOutboundGateway = new FtpOutboundGateway(ftpSessionFactory(), "ls");
        ftpOutboundGateway.setOutputChannelName("lsReplyChannel");
        return ftpOutboundGateway;
    }

}

15.7.2 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the Outbound Gateway using the Java DSL:

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public FtpOutboundGatewaySpec ftpOutboundGateway() {
        return Ftp.outboundGateway(ftpSessionFactory(),
            AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
            .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
            .regexFileNameFilter("(subFtpSource|.*1.txt)")
            .localDirectoryExpression("'localDirectory/' + #remoteDirectory")
            .localFilenameExpression("#remoteFileName.replaceFirst('ftpSource', 'localTarget')");
    }

    @Bean
    public IntegrationFlow ftpMGetFlow(AbstractRemoteFileOutboundGateway<FTPFile> ftpOutboundGateway) {
        return f -> f
            .handle(ftpOutboundGateway)
            .channel(c -> c.queue("remoteFileOutputChannel"));
    }

}

15.7.3 Outbound Gateway Partial Success (mget and mput)

When performing operations on multiple files (mget and mput) it is possible that an exception occurs some time after one or more files have been transferred. In this case (starting with version 4.2), a PartialSuccessException is thrown. As well as the usual MessagingException properties (failedMessage and cause), this exception has two additional properties:

  • partialResults - the successful transfer results.
  • derivedInput - the list of files generated from the request message (e.g. local files to transfer for an mput).

This will enable you to determine which files were successfully transferred, and which were not.

In the case of a recursive mput, the PartialSuccessException may have nested PartialSuccessException s.

Consider:

root/
|- file1.txt
|- subdir/
   | - file2.txt
   | - file3.txt
|- zoo.txt

If the exception occurs on file3.txt, the PartialSuccessException thrown by the gateway will have derivedInput of file1.txt, subdir, zoo.txt and partialResults of file1.txt. It’s cause will be another PartialSuccessException with derivedInput of file2.txt, file3.txt and partialResults of file2.txt.

15.8 FTP Session Caching

[Important]Important

Starting with Spring Integration version 3.0, sessions are no longer cached by default; the cache-sessions attribute is no longer supported on endpoints. You must use a CachingSessionFactory (see below) if you wish to cache sessions.

In versions prior to 3.0, the sessions were cached automatically by default. A cache-sessions attribute was available for disabling the auto caching, but that solution did not provide a way to configure other session caching attributes. For example, you could not limit on the number of sessions created. To support that requirement and other configuration options, a CachingSessionFactory was provided. It provides sessionCacheSize and sessionWaitTimeout properties. As its name suggests, the sessionCacheSize property controls how many active sessions the factory will maintain in its cache (the DEFAULT is unbounded). If the sessionCacheSize threshold has been reached, any attempt to acquire another session will block until either one of the cached sessions becomes available or until the wait time for a Session expires (the DEFAULT wait time is Integer.MAX_VALUE). The sessionWaitTimeout property enables configuration of that value.

If you want your Sessions to be cached, simply configure your default Session Factory as described above and then wrap it in an instance of CachingSessionFactory where you may provide those additional properties.

<bean id="ftpSessionFactory" class="o.s.i.ftp.session.DefaultFtpSessionFactory">
    <property name="host" value="localhost"/>
</bean>

<bean id="cachingSessionFactory" class="o.s.i.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="ftpSessionFactory"/>
    <constructor-arg value="10"/>
    <property name="sessionWaitTimeout" value="1000"/>
</bean>

In the above example you see a CachingSessionFactory created with the sessionCacheSize set to 10 and the sessionWaitTimeout set to 1 second (its value is in milliseconds).

Starting with Spring Integration version 3.0, the CachingConnectionFactory provides a resetCache() method. When invoked, all idle sessions are immediately closed and in-use sessions are closed when they are returned to the cache. New requests for sessions will establish new sessions as necessary.

15.9 RemoteFileTemplate

Starting with Spring Integration version 3.0 a new abstraction is provided over the FtpSession object. The template provides methods to send, retrieve (as an InputStream), remove, and rename files. In addition an execute method is provided allowing the caller to execute multiple operations on the session. In all cases, the template takes care of reliably closing the session. For more information, refer to the JavaDocs for RemoteFileTemplate. There is a subclass for FTP: FtpRemoteFileTemplate.

Additional methods were added in version 4.1 including getClientInstance() which provides access to the underlying FTPClient enabling access to low-level APIs.

Not all FTP servers properly implement STAT <path> command, in that it can return a positive result for a non-existent path. The NLST command reliably returns the name, when the path is a file and it exists. However, this does not support checking that an empty directory exists since NLST always returns an empty list in this case, when the path is a directory. Since the template doesn’t know if the path represents a directory or not, it has to perform additional checks when the path does not appear to exist, when using NLST. This adds overhead, requiring several requests to the server. Starting with version 4.1.9 the FtpRemoteFileTemplate provides FtpRemoteFileTemplate.ExistsMode property with the following options:

  • STAT - Perform the STAT FTP command (FTPClient.getStatus(path)) to check the path existence; this is the default and requires that your FTP server properly supports the STAT command (with a path).
  • NLST - Perform the NLST FTP command - FTPClient.listName(path); use this if you are testing for a path that is a full path to a file; it won’t work for empty directories.
  • NLST_AND_DIRS - Perform the NLST command first and if it returns no files, fall back to a technique which temporarily switches the working directory using FTPClient.changeWorkingDirectory(path). See FtpSession.exists() for more information.

Since we know that the FileExistsMode.FAIL case is always only looking for a file (and not a directory), we safely use NLST mode for the FtpMessageHandler and FtpOutboundGateway components.

For any other cases the FtpRemoteFileTemplate can be extended for implementing a custom logic in the overridden exist() method.

15.10 MessageSessionCallback

Starting with Spring Integration version 4.2, a MessageSessionCallback<F, T> implementation can be used with the <int-ftp:outbound-gateway/> (FtpOutboundGateway) to perform any operation(s) on the Session<FTPFile> with the requestMessage context. It can be used for any non-standard or low-level FTP operation (or several); for example, allowing access from an integration flow definition, and functional interface (Lambda) implementation injection:

@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler ftpOutboundGateway(SessionFactory<FTPFile> sessionFactory) {
    return new FtpOutboundGateway(sessionFactory,
         (session, requestMessage) -> session.list(requestMessage.getPayload()));
}

Another example might be to pre- or post- process the file data being sent/retrieved.

When using XML configuration, the <int-ftp:outbound-gateway/> provides a session-callback attribute to allow you to specify the MessageSessionCallback bean name.

[Note]Note

The session-callback is mutually exclusive with the command and expression attributes. When configuring with Java, different constructors are available in the FtpOutboundGateway class.