28. SFTP Adapters

Spring Integration provides support for file transfer operations via SFTP.

28.1 Introduction

The Secure File Transfer Protocol (SFTP) is a network protocol which allows you to transfer files between two computers on the Internet over any reliable stream.

The SFTP protocol requires a secure channel, such as SSH, as well as visibility to a client’s identity throughout the SFTP session.

Spring Integration supports sending and receiving files over SFTP by providing three client side endpoints: Inbound Channel Adapter, Outbound Channel Adapter, and Outbound Gateway It also provides convenient namespace configuration to define these client components.

xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/sftp
    http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd"

28.2 SFTP Session Factory

[Important]Important

Starting with version 3.0, sessions are no longer cached by default. See Section 28.5, “SFTP Session Caching”.

Before configuring SFTP adapters, you must configure an SFTP Session Factory. You can configure the SFTP Session Factory via a regular bean definition:

<beans:bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <beans:property name="host" value="localhost"/>
    <beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
    <beans:property name="privateKeyPassphrase" value="springIntegration"/>
    <beans:property name="port" value="22"/>
    <beans:property name="user" value="kermit"/>
</beans:bean>

Every time an adapter requests a session object from its SessionFactory, a new SFTP session is being created. Under the covers, the SFTP Session Factory relies on the JSch library to provide the SFTP capabilities.

However, Spring Integration also supports the caching of SFTP sessions, please see Section 28.5, “SFTP Session Caching” for more information.

[Important]Important

JSch supports multiple channels (operations) over a connection to the server. By default, the Spring Integration session factory uses a separate physical connection for each channel. Since Spring Integration 3.0, you can configure the session factory (using a boolean constructor arg - default false) to use a single connection to the server and create multiple JSch channels on that single connection.

When using this feature, you must wrap the session factory in a caching session factory, as described below, so that the connection is not physically closed when an operation completes.

If the cache is reset, the session is disconnected only when the last channel is closed.

The connection will be refreshed if it is found to be disconnected when a new operation obtains a session.

[Note]Note

If you experience connectivity problems and would like to trace Session creation as well as see which Sessions are polled you may enable it by setting the logger to TRACE level (e.g., log4j.category.org.springframework.integration.sftp=TRACE). Please also see Section 28.12, “SFTP/JSCH Logging”.

Now all you need to do is inject this SFTP Session Factory into your adapters.

[Note]Note

A more practical way to provide values for the SFTP Session Factory would be via Spring’s property placeholder support.

28.2.1 Configuration Properties

Below you will find all properties that are exposed by the DefaultSftpSessionFactory.

isSharedSession (constructor argument)

When true, a single connection will be used and JSch Channels will be multiplexed. Defaults to false.

clientVersion

Allows you to set the client version property. It’s default depends on the underlying JSch version but it will look like:_SSH-2.0-JSCH-0.1.45_

enableDaemonThread

If true, all threads will be daemon threads. If set to false, normal non-daemon threads will be used instead. This property will be set on the underlying Session. There, this property will default to false, if not explicitly set.

host

The url of the host you want connect to. Mandatory.

hostKeyAlias

Sets the host key alias, used when comparing the host key to the known hosts list.

knownHosts

Specifies the filename that will be used for a host key repository. The file has the same format as OpenSSH’s known_hosts file and is required and must be pre-populated if allowUnknownKeys is false.

password

The password to authenticate against the remote host. If a password is not provided, then the privateKey property is mandatory. Not allowed if userInfo is set; the password is obtained from that object.

port

The port over which the SFTP connection shall be established. If not specified, this value defaults to 22. If specified, this properties must be a positive number.

privateKey

Allows you to set a Resource, which represents the location of the private key used for authenticating against the remote host. If the privateKey is not provided, then the password property is mandatory.

privateKeyPassphrase

The password for the private key. Not allowed if userInfo is set; the passphrase is obtained from that object. Optional.

proxy

Allows for specifying a JSch-based Proxy. If set, then the proxy object is used to create the connection to the remote host via the proxy. See Section 28.3, “Proxy Factory Bean” for a convenient way to configure the proxy.

serverAliveCountMax

Specifies the number of server-alive messages, which will be sent without any reply from the server before disconnecting. If not set, this property defaults to 1.

serverAliveInterval

Sets the timeout interval (milliseconds) before a server alive message is sent, in case no message is received from the server.

sessionConfig

Using Properties, you can set additional configuration setting on the underlying JSch Session.

socketFactory

Allows you to pass in a SocketFactory. The socket factory is used to create a socket to the target host. When a proxy is used, the socket factory is passed to the proxy. By default plain TCP sockets are used.

timeout

The timeout property is used as the socket timeout parameter, as well as the default connection timeout. Defaults to 0, which means, that no timeout will occur.

user

The remote user to use. Mandatory.

allowUnknownKeys

Set to true to allow connections to hosts with unknown (or changed) keys. Default false (since 4.2 - defaults to true in 4.1.7 and was not configurable before that version). Only applied if no userInfo is provided. If false, a pre-populated knownHosts file is required.

userInfo

Set a custom UserInfo used during authentication. In particular, be aware that promptYesNo() is invoked when an unknown (or changed) host key is received. Also see allowUnknownHosts. When a UserInfo is provided, the password and private key passphrase is obtained from it, and discrete password and privateKeyPassprase properties cannot be set.

28.3 Proxy Factory Bean

Jsch provides a mechanism to connect to the server via an HTTP or SOCKS proxy. To use this feature, configure the Proxy and provide a reference to the DefaultSftpSessionFactory as discussed above. Three implementations are provided by Jsch, HTTP, SOCKS4 and SOCKS5. Spring Integration 4.3 provides a FactoryBean making configuration of these proxies easier, allowing property injection:

<bean id="proxySocks5" class="org.springframework.integration.sftp.session.JschProxyFactoryBean">
    <constructor-arg value="SOCKS5" />
    <constructor-arg value="${sftp.proxy.address}" />
    <constructor-arg value="${sftp.proxy.port}" />
    <constructor-arg value="${sftp.proxy.user}" />
    <constructor-arg value="${sftp.proxy.pw}" />
</bean>

<bean id="sessionFactory"
          class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory" >
    ...
    <property name="proxy" ref="proxySocks5" />
    ...
</bean>

28.4 Delegating Session Factory

Version 4.2 introduced the DelegatingSessionFactory which allows the selection of the actual session factory at runtime. Prior to invoking the ftp endpoint, call setThreadKey() on the factory to associate a key with the current thread. That key is then used to lookup the actual session factory to be used. The key can be cleared by calling clearThreadKey() after use.

Convenience methods have been added so this can easily be done from a message flow:

<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
    <constructor-arg>
        <bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
            <!-- delegate factories here -->
        </bean>
    </constructor-arg>
</bean>

<int:service-activator input-channel="in" output-channel="c1"
        expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />

<int-sftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />

<int:service-activator input-channel="c2" output-channel="out"
        expression="@dsf.clearThreadKey(#root)" />
[Important]Important

When using session caching (see Section 28.5, “SFTP Session Caching”), each of the delegates should be cached; you cannot cache the DelegatingSessionFactory itself.

28.5 SFTP Session Caching

[Important]Important

Starting with Spring Integration version 3.0, sessions are no longer cached by default; the cache-sessions attribute is no longer supported on endpoints. You must use a CachingSessionFactory (see below) if you wish to cache sessions.

In versions prior to 3.0, the sessions were cached automatically by default. A cache-sessions attribute was available for disabling the auto caching, but that solution did not provide a way to configure other session caching attributes. For example, you could not limit on the number of sessions created. To support that requirement and other configuration options, a CachingSessionFactory was provided. It provides sessionCacheSize and sessionWaitTimeout properties. As its name suggests, the sessionCacheSize property controls how many active sessions the factory will maintain in its cache (the DEFAULT is unbounded). If the sessionCacheSize threshold has been reached, any attempt to acquire another session will block until either one of the cached sessions becomes available or until the wait time for a Session expires (the DEFAULT wait time is Integer.MAX_VALUE). The sessionWaitTimeout property enables configuration of that value.

If you want your Sessions to be cached, simply configure your default Session Factory as described above and then wrap it in an instance of CachingSessionFactory where you may provide those additional properties.

<bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <property name="host" value="localhost"/>
</bean>

<bean id="cachingSessionFactory"
    class="org.springframework.integration.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="sftpSessionFactory"/>
    <constructor-arg value="10"/>
    <property name="sessionWaitTimeout" value="1000"/>
</bean>

In the above example you see a CachingSessionFactory created with the sessionCacheSize set to 10 and the sessionWaitTimeout set to 1 second (its value is in milliseconds).

Starting with Spring Integration version 3.0, the CachingConnectionFactory provides a resetCache() method. When invoked, all idle sessions are immediately closed and in-use sessions are closed when they are returned to the cache. When using isSharedSession=true, the channel is closed, and the shared session is closed only when the last channel is closed. New requests for sessions will establish new sessions as necessary.

28.6 RemoteFileTemplate

Starting with Spring Integration version 3.0, a new abstraction is provided over the SftpSession object. The template provides methods to send, retrieve (as an InputStream), remove, and rename files. In addition an execute method is provided allowing the caller to execute multiple operations on the session. In all cases, the template takes care of reliably closing the session. For more information, refer to the javadocs for RemoteFileTemplate There is a subclass for SFTP: SftpRemoteFileTemplate.

Additional methods were added in version 4.1 including getClientInstance() which provides access to the underlying ChannelSftp enabling access to low-level APIs.

Starting with version 5.0, the new RemoteFileOperations.invoke(OperationsCallback<F, T> action) method is available. This method allows several RemoteFileOperations calls to be called in the scope of the same, thread-bounded, Session. This is useful when you need to perform several high-level operations of the RemoteFileTemplate as one unit of work. For example AbstractRemoteFileOutboundGateway uses it with the mput command implementation, where we perform a put operation for each file in the provided directory and recursively for its sub-directories. See the JavaDocs for more information.

28.7 SFTP Inbound Channel Adapter

The SFTP Inbound Channel Adapter is a special listener that will connect to the server and listen for the remote directory events (e.g., new file created) at which point it will initiate a file transfer.

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
              session-factory="sftpSessionFactory"
            channel="requestChannel"
            filename-pattern="*.txt"
            remote-directory="/foo/bar"
            preserve-timestamp="true"
            local-directory="file:target/foo"
            auto-create-local-directory="true"
            local-filename-generator-expression="#this.toUpperCase() + '.a'"
            scanner="myDirScanner"
            local-filter="myFilter"
            temporary-file-suffix=".writing"
            max-fetch-size="-1"
            delete-remote-files="false">
        <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

As you can see from the configuration above you can configure the SFTP Inbound Channel Adapter via the inbound-channel-adapter element while also providing values for various attributes such as local-directory - where files are going to be transferred TO and remote-directory - the remote source directory where files are going to be transferred FROM - as well as other attributes including a session-factory reference to the bean we configured earlier.

By default the transferred file will carry the same name as the original file. If you want to override this behavior you can set the local-filename-generator-expression attribute which allows you to provide a SpEL Expression to generate the name of the local file. Unlike outbound gateways and adapters where the root object of the SpEL Evaluation Context is a Message, this inbound adapter does not yet have the Message at the time of evaluation since that’s what it ultimately generates with the transferred file as its payload. So, the root object of the SpEL Evaluation Context is the original name of the remote file (String).

The inbound channel adapter first retrieves the file to a local directory and then emits each file according to the poller configuration. Starting with version 5.0 you can now limit the number of files fetched from the FTP server when new file retrievals are needed. This can be beneficial when the target files are very large and/or when running in a clustered system with a persistent file list filter discussed below. Use max-fetch-size for this purpose; a negative value (default) means no limit and all matching files will be retrieved; see Section 28.9, “Inbound Channel Adapters: Controlling Remote File Fetching” for more information. Since version 5.0, you can also provide a custom DirectoryScanner implementation to the inbound-channel-adapter via the scanner attribute.

Starting with Spring Integration 3.0, you can specify the preserve-timestamp attribute (default false); when true, the local file’s modified timestamp will be set to the value retrieved from the server; otherwise it will be set to the current time.

Starting with version 4.2, you can specify remote-directory-expression instead of remote-directory, allowing you to dynamically determine the directory on each poll. e.g remote-directory-expression="@myBean.determineRemoteDir()".

Sometimes file filtering based on the simple pattern specified via filename-pattern attribute might not be sufficient. If this is the case, you can use the filename-regex attribute to specify a Regular Expression (e.g. filename-regex=".*\.test$"). And of course if you need complete control you can use the filter attribute to provide a reference to a custom implementation of the org.springframework.integration.file.filters.FileListFilter - a strategy interface for filtering a list of files. This filter determines which remote files are retrieved. You can also combine a pattern based filter with other filters, such as an AcceptOnceFileListFilter to avoid synchronizing files that have previously been fetched, by using a CompositeFileListFilter.

The AcceptOnceFileListFilter stores its state in memory. If you wish the state to survive a system restart, consider using the SftpPersistentAcceptOnceFileListFilter instead. This filter stores the accepted file names in an instance of the MetadataStore strategy (Section 10.5, “Metadata Store”). This filter matches on the filename and the remote modified time.

Since version 4.0, this filter requires a ConcurrentMetadataStore. When used with a shared data store (such as Redis with the RedisMetadataStore) this allows filter keys to be shared across multiple application or server instances.

Starting with version 5.0, the SftpPersistentAcceptOnceFileListFilter with in-memory SimpleMetadataStore is applied by default for the SftpInboundFileSynchronizer. This filter is also applied together with the regex or pattern option in the XML configuration as well as via FtpInboundChannelAdapterSpec in Java DSL. Any other use-cases can be reached via CompositeFileListFilter (or ChainFileListFilter).

The above discussion refers to filtering the files before retrieving them. Once the files have been retrieved, an additional filter is applied to the files on the file system. By default, this is an`AcceptOnceFileListFilter` which, as discussed, retains state in memory and does not consider the file’s modified time. Unless your application removes files after processing, the adapter will re-process the files on disk by default after an application restart.

Also, if you configure the filter to use a FtpPersistentAcceptOnceFileListFilter, and the remote file timestamp changes (causing it to be re-fetched), the default local filter will not allow this new file to be processed.

Use the local-filter attribute to configure the behavior of the local file system filter. Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter is configured by default. This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore strategy (Section 10.5, “Metadata Store”), and will detect changes to the local file modified time. The default MetadataStore is a SimpleMetadataStore which stores state in memory.

Since version 4.1.5, these filters have a new property flushOnUpdate which will cause them to flush the metadata store on every update (if the store implements Flushable).

[Important]Important

Further, if you use a distributed MetadataStore (such as Section 25.5, “Redis Metadata Store” or Section 17.7, “Gemfire Metadata Store”) you can have multiple instances of the same adapter/application and be sure that one and only one will process a file.

The actual local filter is a CompositeFileListFilter containing the supplied filter and a pattern filter that prevents processing files that are in the process of being downloaded (based on the temporary-file-suffix); files are downloaded with this suffix (default: .writing) and the file is renamed to its final name when the transfer is complete, making it visible to the filter.

Please refer to the schema for more detail on these attributes.

It is also important to understand that SFTP Inbound Channel Adapter is a Polling Consumer and therefore you must configure a poller (either a global default or a local sub-element). Once the file has been transferred to a local directory, a Message with java.io.File as its payload type will be generated and sent to the channel identified by the channel attribute.

More on File Filtering and Large Files

Sometimes a file that just appeared in the monitored (remote) directory is not complete. Typically such a file will be written with some temporary extension (e.g., foo.txt.writing) and then renamed after the writing process completes. As a user in most cases you are only interested in files that are complete and would like to filter only those files. To handle these scenarios, use filtering support provided via the filename-pattern, filename-regex and filter attributes. If you need a custom filter implementation simply include a reference in your adapter via the filter attribute.

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="receiveChannel"
            session-factory="sftpSessionFactory"
            filter="customFilter"
            local-directory="file:/local-test-dir"
            remote-directory="/remote-test-dir">
        <int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>

<bean id="customFilter" class="org.foo.CustomFilter"/>

28.7.1 Recovering from Failures

It is important to understand the architecture of the adapter. There is a file synchronizer which fetches the files, and a FileReadingMessageSource to emit a message for each synchronized file. As discussed above, there are two filters involved. The filter attribute (and patterns) refers to the remote (SFTP) file list - to avoid fetching files that have already been fetched. The local-filter is used by the FileReadingMessageSource to determine which files are to be sent as messages.

The synchronizer lists the remote files and consults its filter; the files are then transferred. If an IO error occurs during file transfer, any files that have already been added to the filter are removed so they are eligible to be re-fetched on the next poll. This only applies if the filter implements ReversibleFileListFilter (such as the AcceptOnceFileListFilter).

If, after synchronizing the files, an error occurs on the downstream flow processing a file, there is no automatic rollback of the filter so the failed file will not be reprocessed by default.

If you wish to reprocess such files after a failure, you can use configuration similar to the following to facilitate the removal of the failed file from the filter. This will work for any ResettableFileListFilter.

<int-sftp:inbound-channel-adapter id="sftpAdapter"
        session-factory="sftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/sftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-sftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

Starting with version 5.0, the Inbound Channel Adapter can build sub-directories locally according the generated local file name. That can be a remote sub-path as well. To be able to read local directory recursively for modification according the hierarchy support, an internal FileReadingMessageSource now can bve supplied with a new RecursiveDirectoryScanner based on the Files.walk() algorithm. See AbstractInboundFileSynchronizingMessageSource.setScanner() for more information. Also the AbstractInboundFileSynchronizingMessageSource can now be switched to the WatchService -based DirectoryScanner via setUseWatchService() option. It is also configured for all the WatchEventType s to react for any modifications in local directory. The reprocessing sample above is based on the build-in functionality of the FileReadingMessageSource.WatchServiceDirectoryScanner to perform ResettableFileListFilter.remove() when the file is deleted (StandardWatchEventKinds.ENTRY_DELETE) from the local directory. See Section 15.2.2, “WatchServiceDirectoryScanner” for more information.

28.7.2 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the inbound adapter using Java configuration:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

28.7.3 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the inbound adapter using the Java DSL:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlows
            .from(s -> s.sftp(this.sftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase() + '.a'")
                    .localDirectory(new File("sftp-inbound")),
                 e -> e.id("sftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

28.7.4 Dealing With Incomplete Data

See Section 15.2.7, “Dealing With Incomplete Data”.

The SftpSystemMarkerFilePresentFileListFilter is provided to filter remote files that don’t have the corresponding marker file on the remote system. See the javadocs for configuration information.

28.8 SFTP Streaming Inbound Channel Adapter

The streaming inbound channel adapter was introduced in version 4.3. This adapter produces message with payloads of type InputStream, allowing files to be fetched without writing to the local file system. Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed. The session is provided in the closeableResource header (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE). Standard framework components, such as the FileSplitter and StreamTransformer will automatically close the session. See Section 15.5, “File Splitter” and the section called “Stream Transformer” for more information about these components.

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

Only one of filename-pattern, filename-regex, filter or filter-expression is allowed.

[Important]Important

Starting with version 5.0, by default, the SftpStreamingMessageSource adapter prevents duplicates for remote files via SftpPersistentAcceptOnceFileListFilter based on the in-memory SimpleMetadataStore. This filter is also applied by default together with the filename pattern (or regex) as well. If there is a requirement to allow duplicates, the AcceptAllFileListFilter can be used. Any other use-cases can be reached via CompositeFileListFilter (or ChainFileListFilter). The java configuration below shows one technique to remove the remote file after processing, avoiding duplicates.

Use the max-fetch-size attribute to limit the number of files fetched on each poll when a fetch is necessary; set to 1 and use a persistent filter when running in a clustered environment; see Section 28.9, “Inbound Channel Adapters: Controlling Remote File Fetching” for more information.

The adapter puts the remote directory and file name in headers FileHeaders.REMOTE_DIRECTORY and FileHeaders.REMOTE_FILE respectively. Starting with version 5.0, additional remote file information, in JSON, is provided in the FileHeaders.REMOTE_FILE_INFO header. If you set the fileInfoJson property on the SftpStreamingMessageSource to false, the header will contain an SftpFileInfo object. The LsEntry object provided by the underlying Jsch library can be accessed using the SftpFileInfo.getFileInfo() method. The fileInfoJson property is not available when using XML configuration but you can set it by injecting the SftpStreamingMessageSource into one of your configuration classes.

28.8.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the inbound adapter using Java configuration:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

Notice that, in this example, the message handler downstream of the transformer has an advice that removes the remote file after processing.

28.9 Inbound Channel Adapters: Controlling Remote File Fetching

There are two properties that should be considered when configuring inbound channel adapters. max-messages-per-poll, as with all pollers, can be used to limit the number of messages emitted on each poll (if more than the configured value are ready). max-fetch-size (since version 5.0) can limit the number of files retrieved from the remote server at a time.

The following scenarios assume the starting state is an empty local directory.

  • max-messages-per-poll=2 and max-fetch-size=1, the adapter will fetch one file, emit it, fetch the next file, emit it; then sleep until the next poll.
  • max-messages-per-poll=2 and max-fetch-size=2), the adapter will fetch both files, then emit each one.
  • max-messages-per-poll=2 and max-fetch-size=4, the adapter will fetch up to 4 files (if available) and emit the first two (if there are at least two); the next two files will be emitted on the next poll.
  • max-messages-per-poll=2 and max-fetch-size not specified, the adapter will fetch all remote files and emit the first two (if there are at least two); the subsequent files will be emitted on subsequent polls (2-at-a-time); when all are consumed, the remote fetch will be attempted again, to pick up any new files.
[Important]Important

When deploying multiple instances of an application, a small max-fetch-size is recommended to avoid one instance "grabbing" all the files and starving other instances.

Another use for max-fetch-size is if you want to stop fetching remote files, but continue to process files that have already been fetched. Setting the maxFetchSize property on the MessageSource (programmatically, via JMX, or via a control bus) effectively stops the adapter from fetching more files, but allows the poller to continue to emit messages for files that have previously been fetched. If the poller is active when the property is changed, the change will take effect on the next poll.

28.10 SFTP Outbound Channel Adapter

The SFTP Outbound Channel Adapter is a special MessageHandler that will connect to the remote directory and will initiate a file transfer for every file it will receive as the payload of an incoming Message. It also supports several representations of the File so you are not limited to the File object. Similar to the FTP outbound adapter, the SFTP Outbound Channel Adapter supports the following payloads: 1) java.io.File - the actual file object; 2) byte[] - byte array that represents the file contents; 3) java.lang.String - text that represents the file contents.

<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
    session-factory="sftpSessionFactory"
    channel="inputChannel"
    charset="UTF-8"
    remote-file-separator="/"
    remote-directory="foo/bar"
    remote-filename-generator-expression="payload.getName() + '-foo'"
    filename-generator="fileNameGenerator"
    use-temporary-filename="true"
    chmod="600"
    mode="REPLACE"/>

As you can see from the configuration above you can configure the SFTP Outbound Channel Adapter via the outbound-channel-adapter element. Please refer to the schema for more detail on these attributes.

SpEL and the SFTP Outbound Adapter

As with many other components in Spring Integration, you can benefit from the Spring Expression Language (SpEL) support when configuring an SFTP Outbound Channel Adapter, by specifying two attributes remote-directory-expression and remote-filename-generator-expression (see above). The expression evaluation context will have the Message as its root object, thus allowing you to provide expressions which can dynamically compute the file name or the existing directory path based on the data in the Message (either from payload or headers). In the example above we are defining the remote-filename-generator-expression attribute with an expression value that computes the file name based on its original name while also appending a suffix: -foo.

Starting with version 4.1, you can specify the mode when transferring the file. By default, an existing file will be overwritten; the modes are defined on enum FileExistsMode, having values REPLACE (default), APPEND, IGNORE, and FAIL. With IGNORE and FAIL, the file is not transferred; FAIL causes an exception to be thrown whereas IGNORE silently ignores the transfer (although a DEBUG log entry is produced).

Avoiding Partially Written Files

One of the common problems, when dealing with file transfers, is the possibility of processing a partial file - a file might appear in the file system before its transfer is actually complete.

To deal with this issue, Spring Integration SFTP adapters use a very common algorithm where files are transferred under a temporary name and than renamed once they are fully transferred.

By default, every file that is in the process of being transferred will appear in the file system with an additional suffix which, by default, is .writing; this can be changed using the temporary-file-suffix attribute.

However, there may be situations where you don’t want to use this technique (for example, if the server does not permit renaming files). For situations like this, you can disable this feature by setting use-temporary-file-name to false (default is true). When this attribute is false, the file is written with its final name and the consuming application will need some other mechanism to detect that the file is completely uploaded before accessing it.

Version 4.3 introduced the chmod attribute which changes the remote file permissions after upload. Use the conventional Unix octal format, e.g. 600 allows read-write for the file owner only. When configuring the adapter using java, you can use setChmodOctal("600") or setChmodDecimal(384).

28.10.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the Outbound Adapter using Java configuration:

@SpringBootApplication
@IntegrationComponentScan
public class SftpJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                    new SpringApplicationBuilder(SftpJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToSftp(new File("/foo/bar.txt"));
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    @Bean
    @ServiceActivator(inputChannel = "toSftpChannel")
    public MessageHandler handler() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
        handler.setFileNameGenerator(new FileNameGenerator() {

            @Override
            public String generateFileName(Message<?> message) {
                 return "handlerContent.test";
            }

        });
        return handler;
    }

    @MessagingGateway
    public interface MyGateway {

         @Gateway(requestChannel = "toSftpChannel")
         void sendToSftp(File file);

    }
}

28.10.2 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the Outbound Adapter using the Java DSL:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpOutboundFlow() {
        return IntegrationFlows.from("toSftpChannel")
            .handle(Sftp.outboundAdapter(this.sftpSessionFactory, FileExistsMode.FAIL)
                         .useTemporaryFileName(false)
                         .remoteDirectory("/foo")
            ).get();
    }

}

28.11 SFTP Outbound Gateway

The SFTP Outbound Gateway provides a limited set of commands to interact with a remote SFTP server. Commands supported are:

  • ls (list files)
  • nlst (list file names)
  • get (retrieve file)
  • mget (retrieve file(s))
  • rm (remove file(s))
  • mv (move/rename file)
  • put (send file)
  • mput (send multiple files)

ls

ls lists remote file(s) and supports the following options:

  • -1 - just retrieve a list of filenames, default is to retrieve a list of FileInfo objects.
  • -a - include all files (including those starting with .)
  • -f - do not sort the list
  • -dirs - include directories (excluded by default)
  • -links - include symbolic links (excluded by default)
  • -R - list the remote directory recursively

In addition, filename filtering is provided, in the same manner as the inbound-channel-adapter.

The message payload resulting from an ls operation is a list of file names, or a list of FileInfo objects. These objects provide information such as modified time, permissions etc.

The remote directory that the ls command acted on is provided in the file_remoteDirectory header.

When using the recursive option (-R), the fileName includes any subdirectory elements, representing a relative path to the file (relative to the remote directory). If the -dirs option is included, each recursive directory is also returned as an element in the list. In this case, it is recommended that the -1 is not used because you would not be able to determine files Vs. directories, which is achievable using the FileInfo objects.

nlst

(Since version 5.0)

Lists remote file names and supports the following options:

  • -f - do not sort the list

The message payload resulting from an nlst operation is a list of file names.

The remote directory that the nlst command acted on is provided in the file_remoteDirectory header.

The SFTP protocol doesn’t provide list names functionality, s this command is fully equivalent of the ls command with -1 option and added here for convenience.

get

get retrieves a remote file and supports the following option:

  • -P - preserve the timestamp of the remote file.
  • -stream - retrieve the remote file as a stream.
  • -D - delete the remote file after successful transfer. The remote file is NOT deleted if the transfer is ignored because the FileExistsMode is IGNORE and the local file already exists.

The remote directory is provided in the file_remoteDirectory header, and the filename is provided in the file_remoteFile header.

The message payload resulting from a get operation is a File object representing the retrieved file, or an InputStream when the -stream option is provided. This option allows retrieving the file as a stream. For text files, a common use case is to combine this operation with a File Splitter or Stream Transformer. When consuming remote files as streams, the user is responsible for closing the Session after the stream is consumed. For convenience, the Session is provided in the closeableResource header, a convenience method is provided on the IntegrationMessageHeaderAccessor:

Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
    closeable.close();
}

Framework components such as the File Splitter and Stream Transformer will automatically close the session after the data is transferred.

The following shows an example of consuming a file as a stream:

<int-sftp:outbound-gateway session-factory="ftpSessionFactory"
                            request-channel="inboundGetStream"
                            command="get"
                            command-options="-stream"
                            expression="payload"
                            remote-directory="ftpTarget"
                            reply-channel="stream" />

<int-file:splitter input-channel="stream" output-channel="lines" />

Note: if you consume the input stream in a custom component, you must close the Session. You can either do that in your custom code, or route a copy of the message to a service-activator and use SpEL:

<int:service-activator input-channel="closeSession"
    expression="headers['closeableResource'].close()" />

mget

mget retrieves multiple remote files based on a pattern and supports the following options:

  • -P - preserve the timestamps of the remote files.
  • -R - retrieve the entire directory tree recursively.
  • -x - Throw an exception if no files match the pattern (otherwise an empty list is returned).
  • -D - delete each remote file after successful transfer. The remote file is NOT deleted if the transfer is ignored because the FileExistsMode is IGNORE and the local file already exists.

The message payload resulting from an mget operation is a List<File> object - a List of File objects, each representing a retrieved file.

[Important]Important

Starting with version 5.0, if the FileExistsMode is IGNORE, the payload of the output message will no longer contain files that were not fetched due to the file already existing. Previously, the array contained all files, including those that already existed.

The expression used to determine the remote path should produce a result that ends with * - e.g. foo/* will fetch the complete tree under foo.

Starting with version 5.0, a recursive MGET, combined with the new FileExistsMode.REPLACE_IF_MODIFIED mode, can be used to periodically synchronize an entire remote directory tree locally. This mode will set the local file last modified timestamp to the remote file timestamp, regardless of the -P (preserve timestamp) option.

[Important]Notes for when using recursion (-R)

The pattern is ignored, and * is assumed. By default, the entire remote tree is retrieved. However, files in the tree can be filtered, by providing a FileListFilter; directories in the tree can also be filtered this way. A FileListFilter can be provided by reference or by filename-pattern or filename-regex attributes. For example, filename-regex="(subDir|.*1.txt)" will retrieve all files ending with 1.txt in the remote directory and the subdirectory subDir. However, see below for an alternative available in version 5.0.

If a subdirectory is filtered, no additional traversal of that subdirectory is performed.

The -dirs option is not allowed (the recursive mget uses the recursive ls to obtain the directory tree and the directories themselves cannot be included in the list).

Typically, you would use the #remoteDirectory variable in the local-directory-expression so that the remote directory structure is retained locally.

Starting with version 5.0, the SftpSimplePatternFileListFilter and SftpRegexPatternFileListFilter can be configured to always pass directories by setting the alwaysAcceptDirectorties to true. This allows recursion for a simple pattern; examples follow:

<bean id="starDotTxtFilter"
            class="org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter">
    <constructor-arg value="*.txt" />
    <property name="alwaysAcceptDirectories" value="true" />
</bean>

<bean id="dotStarDotTxtFilter"
            class="org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter">
    <constructor-arg value="^.*\.txt$" />
    <property name="alwaysAcceptDirectories" value="true" />
</bean>

and provide one of these filters using filter property on the gateway.

See also Section 28.11.3, “Outbound Gateway Partial Success (mget and mput)”

put

put sends a file to the remote server; the payload of the message can be a java.io.File, a byte[] or a String. A remote-filename-generator (or expression) is used to name the remote file. Other available attributes include remote-directory, temporary-remote-directory (and their *-expression) equivalents, use-temporary-file-name, and auto-create-directory. Refer to the schema documentation for more information.

The message payload resulting from a put operation is a String representing the full path of the file on the server after transfer.

Version 4.3 introduced the chmod attribute which changes the remote file permissions after upload. Use the conventional Unix octal format, e.g. 600 allows read-write for the file owner only. When configuring the adapter using java, you can use setChmod(0600).

mput

mput sends multiple files to the server and supports the following option:

  • -R - Recursive - send all files (possibly filtered) in the directory and subdirectories

The message payload must be a java.io.File representing a local directory.

The same attributes as the put command are supported. In addition, files in the local directory can be filtered with one of mput-pattern, mput-regex, mput-filter or mput-filter-expression. The filter works with recursion, as long as the subdirectories themselves pass the filter. Subdirectories that do not pass the filter are not recursed.

The message payload resulting from an mget operation is a List<String> object - a List of remote file paths resulting from the transfer.

See also Section 28.11.3, “Outbound Gateway Partial Success (mget and mput)”

Version 4.3 introduced the chmod attribute which changes the remote file permissions after upload. Use the conventional Unix octal format, e.g. 600 allows read-write for the file owner only. When configuring the adapter using java, you can use setChmodOctal("600") or setChmodDecimal(384).

rm

The rm command has no options.

The message payload resulting from an rm operation is Boolean.TRUE if the remove was successful, Boolean.FALSE otherwise. The remote directory is provided in the file_remoteDirectory header, and the filename is provided in the file_remoteFile header.

mv

The mv command has no options.

The expression attribute defines the "from" path and the rename-expression attribute defines the "to" path. By default, the rename-expression is headers['file_renameTo']. This expression must not evaluate to null, or an empty String. If necessary, any remote directories needed will be created. The payload of the result message is Boolean.TRUE. The original remote directory is provided in the file_remoteDirectory header, and the filename is provided in the file_remoteFile header. The new path is in the file_renameTo header.

Additional Information

The get and mget commands support the local-filename-generator-expression attribute. It defines a SpEL expression to generate the name of local file(s) during the transfer. The root object of the evaluation context is the request Message but, in addition, the remoteFileName variable is also available, which is particularly useful for mget, for example: local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.foo"

The get and mget commands support the local-directory-expression attribute. It defines a SpEL expression to generate the name of local directory(ies) during the transfer. The root object of the evaluation context is the request Message but, in addition, the remoteDirectory variable is also available, which is particularly useful for mget, for example: local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.foo". This attribute is mutually exclusive with local-directory attribute.

For all commands, the PATH that the command acts on is provided by the expression property of the gateway. For the mget command, the expression might evaluate to , meaning retrieve all files, or somedirectory/ etc.

Here is an example of a gateway configured for an ls command…​

<int-ftp:outbound-gateway id="gateway1"
        session-factory="ftpSessionFactory"
        request-channel="inbound1"
        command="ls"
        command-options="-1"
        expression="payload"
        reply-channel="toSplitter"/>

The payload of the message sent to the toSplitter channel is a list of String objects containing the filename of each file. If the command-options was omitted, it would be a list of FileInfo objects. Options are provided space-delimited, e.g. command-options="-1 -dirs -links".

Starting with version 4.2, the GET, MGET, PUT and MPUT commands support a FileExistsMode property (mode when using the namespace support). This affects the behavior when the local file exists (GET and MGET) or the remote file exists (PUT and MPUT). Supported modes are REPLACE, APPEND, FAIL and IGNORE. For backwards compatibility, the default mode for PUT and MPUT operations is REPLACE and for GET and MGET operations, the default is FAIL.

28.11.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the Outbound Gateway using Java configuration:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new SftpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
    }

}

28.11.2 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the Outbound Gateway using the Java DSL:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        return new CachingSessionFactory<LsEntry>(sf);
    }

    @Bean
    public QueueChannelSpec remoteFileOutputChannel() {
        return MessageChannels.queue();
    }

    @Bean
    public IntegrationFlow sftpMGetFlow() {
        return IntegrationFlows.from("sftpMgetInputChannel")
            .handleWithAdapter(h ->
                h.sftpGateway(sftpSessionFactory(), AbstractRemoteFileOutboundGateway.Command.MGET,
                    "payload")
                    .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
                    .regexFileNameFilter("(subSftpSource|.*1.txt)")
                    .localDirectoryExpression("'myDir/' + #remoteDirectory")
                    .localFilenameExpression("#remoteFileName.replaceFirst('sftpSource', 'localTarget')"))
            .channel("remoteFileOutputChannel")
            .get();
    }

}

28.11.3 Outbound Gateway Partial Success (mget and mput)

When performing operations on multiple files (mget and mput) it is possible that an exception occurs some time after one or more files have been transferred. In this case (starting with version 4.2), a PartialSuccessException is thrown. As well as the usual MessagingException properties (failedMessage and cause), this exception has two additional properties:

  • partialResults - the successful transfer results.
  • derivedInput - the list of files generated from the request message (e.g. local files to transfer for an mput).

This will enable you to determine which files were successfully transferred, and which were not.

In the case of a recursive mput, the PartialSuccessException may have nested PartialSuccessException s.

Consider:

root/
|- file1.txt
|- subdir/
   | - file2.txt
   | - file3.txt
|- zoo.txt

If the exception occurs on file3.txt, the PartialSuccessException thrown by the gateway will have derivedInput of file1.txt, subdir, zoo.txt and partialResults of file1.txt. It’s cause will be another PartialSuccessException with derivedInput of file2.txt, file3.txt and partialResults of file2.txt.

28.12 SFTP/JSCH Logging

Since we use JSch libraries (http://www.jcraft.com/jsch/) to provide SFTP support, at times you may require more information from the JSch API itself, especially if something is not working properly (e.g., Authentication exceptions). Unfortunately JSch does not use commons-logging but instead relies on custom implementations of their com.jcraft.jsch.Logger interface. As of Spring Integration 2.0.1, we have implemented this interface. So, now all you need to do to enable JSch logging is to configure your logger the way you usually do. For example, here is valid configuration of a logger using Log4J.

log4j.category.com.jcraft.jsch=DEBUG

28.13 MessageSessionCallback

Starting with Spring Integration version 4.2, a MessageSessionCallback<F, T> implementation can be used with the <int-sftp:outbound-gateway/> (SftpOutboundGateway) to perform any operation(s) on the Session<LsEntry> with the requestMessage context. It can be used for any non-standard or low-level FTP operation (or several); for example, allowing access from an integration flow definition, and functional interface (Lambda) implementation injection:

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler sftpOutboundGateway(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
    return new SftpOutboundGateway(sessionFactory,
         (session, requestMessage) -> session.list(requestMessage.getPayload()));
}

Another example might be to pre- or post- process the file data being sent/retrieved.

When using XML configuration, the <int-sftp:outbound-gateway/> provides a session-callback attribute to allow you to specify the MessageSessionCallback bean name.

[Note]Note

The session-callback is mutually exclusive with the command and expression attributes. When configuring with Java, different constructors are available in the SftpOutboundGateway class.