SFTP Adapters
Spring Integration provides support for file transfer operations over SFTP.
The Secure File Transfer Protocol (SFTP) is a network protocol that lets you transfer files between two computers on the Internet over any reliable stream.
The SFTP protocol requires a secure channel, such as SSH, and visibility to a client’s identity throughout the SFTP session.
Spring Integration supports sending and receiving files over SFTP by providing three client side endpoints: inbound channel adapter, outbound channel adapter, and outbound gateway. It also provides convenient namespace configuration to define these client components.
Starting with version 6.0, an outdated JCraft JSch client has been replaced with modern Apache MINA SSHD framework.
This caused a lot of breaking changes in the framework components.
However, in most cases, such a migration is hidden behind Spring Integration API.
The most drastic changed has happened with a DefaultSftpSessionFactory which is based now on the org.apache.sshd.client.SshClient and exposes some if its configuration properties.
|
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-sftp</artifactId>
<version>6.1.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-sftp:6.1.1"
To include the SFTP namespace in your xml configuration, include the following attributes on the root element:
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/sftp
https://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd"
SFTP Session Factory
As of version 3.0, sessions are no longer cached by default. See SFTP Session Caching. |
Before configuring SFTP adapters, you must configure an SFTP session factory. You can configure the SFTP session factory with a regular bean definition, as the following example shows:
<beans:bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<beans:property name="host" value="localhost"/>
<beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
<beans:property name="privateKeyPassphrase" value="springIntegration"/>
<beans:property name="port" value="22"/>
<beans:property name="user" value="kermit"/>
</beans:bean>
Every time an adapter requests a session object from its SessionFactory
, a new SFTP session is created.
Under the covers, the SFTP Session Factory relies on the Apache MINA SSHD library to provide the SFTP capabilities.
However, Spring Integration also supports the caching of SFTP sessions. See SFTP Session Caching for more information.
The DefaultSftpSessionFactory can use an externally configured or extended SshClient .
For example, the org.eclipse.jgit.internal.transport.sshd.JGitSshClient extension from the org.eclipse.jgit:org.eclipse.jgit.ssh.apache library may be used to provide support for HTTP/SOCKS proxies.
|
The When using this feature, you must wrap the session factory in a caching session factory, as described later, so that the connection is not physically closed when an operation completes. If the cache is reset, the session is disconnected only when the last channel is closed. The connection is refreshed if it is found to be disconnected when a new operation obtains a session. |
Now all you need to do is inject this SFTP session factory into your adapters.
A more practical way to provide values for the SFTP session factory is to use Spring’s property placeholder support. |
Configuration Properties
The following list describes all the properties that are exposed by the DefaultSftpSessionFactory
.
isSharedSession
(constructor argument)::When true
, a single SftpClient
is used for all the requested SftpSession
instances.
It defaults to false
.
sftpVersionSelector
::An SftpVersionSelector
instance for SFTP protocol selection.
The default one is SftpVersionSelector.CURRENT
.
host
::The URL of the host to which to connect.
Required.
hostConfig
::An org.apache.sshd.client.config.hosts.HostConfigEntry
instance as an alternative for the user/host/port options.
Can be configured with a proxy jump property.
port
::The port over which the SFTP connection shall be established.
If not specified, this value defaults to 22
.
If specified, this properties must be a positive number.
user
::The remote user to use.
Required.
knownHostsResource
::An org.springframework.core.io.Resource
that used for a host key repository.
The content of the resource has to be the same format as OpenSSH known_hosts
file and is required and must be pre-populated if allowUnknownKeys
is false.
password
::The password to authenticate against the remote host.
If a password is not provided, then the privateKey
property is required.
privateKey
::An org.springframework.core.io.Resource
that represents the location of the private key used for authenticating against the remote host.
If the privateKey
is not provided, then the password
property is required.
privateKeyPassphrase
::The password for the private key.
If you set userInfo
, privateKeyPassphrase
is not allowed .
The passphrase is obtained from that object.
Optional.
timeout
::The timeout property is used as the socket timeout parameter, as well as the default connection timeout.
Defaults to 0
, which means, that no timeout will occur.
allowUnknownKeys
::Set to true
to allow connections to hosts with unknown (or changed) keys.
Its default is 'false'.
If false
, a pre-populated knownHosts
file is required.
userInteraction
::A custom org.apache.sshd.client.auth.keyboard.UserInteraction
to be used during authentication.
Delegating Session Factory
Version 4.2 introduced the DelegatingSessionFactory
, which allows the selection of the actual session factory at runtime.
Prior to invoking the SFTP endpoint, you can call setThreadKey()
on the factory to associate a key with the current thread.
That key is then used to look up the actual session factory to be used.
You can clear the key by calling clearThreadKey()
after use.
We added convenience methods so that you can more easily do so from a message flow, as the following example shows:
<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />
<int-sftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />
<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />
When using session caching (see SFTP Session Caching), each of the delegates should be cached.
You cannot cache the DelegatingSessionFactory itself.
|
Starting with version 5.0.7, the DelegatingSessionFactory
can be used in conjunction with a RotatingServerAdvice
to poll multiple servers; see Inbound Channel Adapters: Polling Multiple Servers and Directories.
SFTP Session Caching
Starting with Spring Integration version 3.0, sessions are no longer cached by default.
The cache-sessions attribute is no longer supported on endpoints.
If you wish to cache sessions, you must use a CachingSessionFactory (see the next example).
|
In versions prior to 3.0, the sessions were automatically cached by default.
A cache-sessions
attribute was available for disabling the auto caching, but that solution did not provide a way to configure other session-caching attributes.
For example, you could not limit on the number of sessions created.
To support that requirement and other configuration options, we added a CachingSessionFactory
.
It provides sessionCacheSize
and sessionWaitTimeout
properties.
As its name suggests, the sessionCacheSize
property controls how many active sessions the factory maintains in its cache (the default is unbounded).
If the sessionCacheSize
threshold has been reached, any attempt to acquire another session blocks until either one of the cached sessions becomes available or until the wait time for a session expires (the default wait time is Integer.MAX_VALUE
).
The sessionWaitTimeout
property enables configuration of the wait time.
If you want your sessions to be cached, configure your default session factory (as described earlier) and then wrap it in an instance of CachingSessionFactory
where you may provide those additional properties.
The following example shows how to do so:
<bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="sftpSessionFactory"/>
<constructor-arg value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>
The preceding example creates a CachingSessionFactory
with its sessionCacheSize
set to 10
and its sessionWaitTimeout
set to one second (1000 milliseconds).
Starting with Spring Integration version 3.0, the CachingConnectionFactory
provides a resetCache()
method.
When invoked, all idle sessions are immediately closed and in-use sessions are closed when they are returned to the cache.
When using isSharedSession=true
, the channel is closed and the shared session is closed only when the last channel is closed.
New requests for sessions establish new sessions as necessary.
Starting with version 5.1, the CachingSessionFactory
has a new property testSession
.
When true, the session will be tested by performing a REALPATH
command for an empty path to ensure it is still active; if not, it will be removed from the cache; a new session is created if no active sessions are in the cache.
Using RemoteFileTemplate
Spring Integration version 3.0 provides a new abstraction over the SftpSession
object.
The template provides methods to send, retrieve (as an InputStream
), remove, and rename files.
In addition, we provide an execute
method to let the caller run multiple operations on the session.
In all cases, the template takes care of reliably closing the session.
For more information, see the Javadoc for RemoteFileTemplate
There is a subclass for SFTP: SftpRemoteFileTemplate
.
We added additional methods in version 4.1, including getClientInstance()
.
It provides access to the underlying ChannelSftp
, which enables access to low-level APIs.
Version 5.0 introduced the RemoteFileOperations.invoke(OperationsCallback<F, T> action)
method.
This method lets several RemoteFileOperations
calls be called in the scope of the same thread-bounded Session
.
This is useful when you need to perform several high-level operations of the RemoteFileTemplate
as one unit of work.
For example, AbstractRemoteFileOutboundGateway
uses it with the mput
command implementation, where we perform a put
operation for each file in the provided directory and recursively for its sub-directories.
See the Javadoc for more information.
SFTP Inbound Channel Adapter
The SFTP inbound channel adapter is a special listener that connects to the server and listens for the remote directory events (such as a new file being created), at which point it initiates a file transfer. The following example shows how to configure an SFTP inbound channel adapter:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
The preceding configuration example shows how to provide values for various attributes, including the following:
-
local-directory
: The location to which files are going to be transferred -
remote-directory
: The remote source directory from which files are going to be transferred -
session-factory
: A reference to the bean we configured earlier
By default, the transferred file carries the same name as the original file.
If you want to override this behavior, you can set the local-filename-generator-expression
attribute, which lets you provide a SpEL expression to generate the name of the local file.
Unlike outbound gateways and adapters, where the root object of the SpEL evaluation context is a Message
, this inbound adapter does not yet have the message at the time of evaluation, since that is what it ultimately generates with the transferred file as its payload.
Consequently, the root object of the SpEL evaluation context is the original name of the remote file (a String
).
The inbound channel adapter first retrieves the file to a local directory and then emits each file according to the poller configuration.
Starting with version 5.0, you can limit the number of files fetched from the SFTP server when new file retrievals are needed.
This can be beneficial when the target files are large or when running in a clustered system with a persistent file list filter, discussed later in this section.
Use max-fetch-size
for this purpose.
A negative value (the default) means no limit and all matching files are retrieved.
See Inbound Channel Adapters: Controlling Remote File Fetching for more information.
Since version 5.0, you can also provide a custom DirectoryScanner
implementation to the inbound-channel-adapter
by setting the scanner
attribute.
Starting with Spring Integration 3.0, you can specify the preserve-timestamp
attribute (the default is false
).
When true
, the local file’s modified timestamp is set to the value retrieved from the server.
Otherwise, it is set to the current time.
Starting with version 4.2, you can specify remote-directory-expression
instead of remote-directory
, which lets you dynamically determine the directory on each poll — for example, remote-directory-expression="@myBean.determineRemoteDir()"
.
Sometimes, file filtering based on the simple pattern specified via filename-pattern
attribute might not suffice.
If this is the case, you can use the filename-regex
attribute to specify a regular expression (for example, filename-regex=".*\.test$"
).
If you need complete control, you can use the filter
attribute to provide a reference to a custom implementation of the org.springframework.integration.file.filters.FileListFilter
, which is a strategy interface for filtering a list of files.
This filter determines which remote files are retrieved.
You can also combine a pattern-based filter with other filters (such as an AcceptOnceFileListFilter
, to avoid synchronizing files that have previously been fetched) by using a CompositeFileListFilter
.
The AcceptOnceFileListFilter
stores its state in memory.
If you wish the state to survive a system restart, consider using the SftpPersistentAcceptOnceFileListFilter
instead.
This filter stores the accepted file names in an instance of the MetadataStore
strategy (see Metadata Store).
This filter matches on the filename and the remote modified time.
Since version 4.0, this filter requires a ConcurrentMetadataStore
.
When used with a shared data store (such as Redis
with the RedisMetadataStore
), this lets filter keys be shared across multiple application or server instances.
Starting with version 5.0, the SftpPersistentAcceptOnceFileListFilter
with an in-memory SimpleMetadataStore
is applied by default for the SftpInboundFileSynchronizer
.
This filter is also applied, together with the regex
or pattern
option in the XML configuration, as well as through SftpInboundChannelAdapterSpec
in Java DSL.
You can handle any other use-cases by using CompositeFileListFilter
(or ChainFileListFilter
).
The above discussion refers to filtering the files before retrieving them. Once the files have been retrieved, an additional filter is applied to the files on the file system. By default, this is an`AcceptOnceFileListFilter`, which, as discussed in this section, retains state in memory and does not consider the file’s modified time. Unless your application removes files after processing, the adapter re-processes the files on disk by default after an application restart.
Also, if you configure the filter
to use a SftpPersistentAcceptOnceFileListFilter
and the remote file timestamp changes (causing it to be re-fetched), the default local filter does not allow this new file to be processed.
For more information about this filter, and how it is used, see Remote Persistent File List Filters.
You can use the local-filter
attribute to configure the behavior of the local file system filter.
Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter
is configured by default.
This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore
strategy (see Metadata Store) and detects changes to the local file modified time.
The default MetadataStore
is a SimpleMetadataStore
that stores state in memory.
Since version 4.1.5, these filters have a new property called flushOnUpdate
, which causes them to flush the
metadata store on every update (if the store implements Flushable
).
Further, if you use a distributed MetadataStore (such as Redis Metadata Store), you can have multiple instances of the same adapter or application and be sure that one and only one instance processes a file.
|
The actual local filter is a CompositeFileListFilter
that contains the supplied filter and a pattern filter that prevents processing files that are in the process of being downloaded (based on the temporary-file-suffix
).
Files are downloaded with this suffix (the default is .writing
), and the files are renamed to their final names when the transfer is complete, making them 'visible' to the filter.
See the schema for more detail on these attributes.
SFTP inbound channel adapter is a polling consumer.
Therefore, you must configure a poller (either a global default or a local element).
Once the file has been transferred to a local directory, a message with java.io.File
as its payload type is generated and sent to the channel identified by the channel
attribute.
More on File Filtering and Large Files
Sometimes, a file that just appeared in the monitored (remote) directory is not complete.
Typically, such a file is written with some temporary extension (such as .writing
on a file named something.txt.writing
) and then renamed after the writing process completes.
In most cases, developers are interested only in files that are complete and would like to filter only those files.
To handle these scenarios, you can use the filtering support provided by the filename-pattern
, filename-regex
, and filter
attributes.
If you need a custom filter implementation, you can include a reference in your adapter by setting the filter
attribute.
The following example shows how to do so:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
Recovering from Failures
You should understand the architecture of the adapter.
A file synchronizer fetches the files, and a FileReadingMessageSource
emits a message for each synchronized file.
As discussed earlier, two filters are involved.
The filter
attribute (and patterns) refers to the remote (SFTP) file list, to avoid fetching files that have already been fetched.
the FileReadingMessageSource
uses the local-filter
to determine which files are to be sent as messages.
The synchronizer lists the remote files and consults its filter.
The files are then transferred.
If an IO error occurs during file transfer, any files that have already been added to the filter are removed so that they are eligible to be re-fetched on the next poll.
This applies only if the filter implements ReversibleFileListFilter
(such as the AcceptOnceFileListFilter
).
If, after synchronizing the files, an error occurs on the downstream flow processing a file, no automatic rollback of the filter occurs, so the failed file is not reprocessed by default.
If you wish to reprocess such files after a failure, you can use a configuration similar to the following to facilitate the removal of the failed file from the filter:
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
The preceding configuration works for any ResettableFileListFilter
.
Starting with version 5.0, the inbound channel adapter can build sub-directories locally, according to the generated local file name.
That can be a remote sub-path as well.
To be able to read a local directory recursively for modification according to the hierarchy support, you can now supply an internal FileReadingMessageSource
with a new RecursiveDirectoryScanner
based on the Files.walk()
algorithm.
See AbstractInboundFileSynchronizingMessageSource.setScanner()
for more information.
Also, you can now switch the AbstractInboundFileSynchronizingMessageSource
to the WatchService
-based DirectoryScanner
by using setUseWatchService()
option.
It is also configured for all the WatchEventType
instances to react for any modifications in local directory.
The reprocessing sample shown earlier is based on the built-in functionality of the FileReadingMessageSource.WatchServiceDirectoryScanner
, which uses ResettableFileListFilter.remove()
when the file is deleted (StandardWatchEventKinds.ENTRY_DELETE
) from the local directory.
See WatchServiceDirectoryScanner
for more information.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the inbound adapter with Java:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the inbound adapter with the Java DSL:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
Dealing With Incomplete Data
The SftpSystemMarkerFilePresentFileListFilter
is provided to filter remote files that don’t have the corresponding marker file on the remote system.
See the Javadoc for configuration information.
SFTP Streaming Inbound Channel Adapter
Version 4.3 introduced the streaming inbound channel adapter.
This adapter produces message with payloads of type InputStream
, letting you fetch files without writing to the local file system.
Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed.
The session is provided in the closeableResource
header (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
).
Standard framework components, such as the FileSplitter
and StreamTransformer
, automatically close the session.
See File Splitter and Stream Transformer for more information about these components.
The following example shows how to configure an SFTP streaming inbound channel adapter:
<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>
You can use only one of filename-pattern
, filename-regex
, filter
, or filter-expression
.
Starting with version 5.0, by default, the SftpStreamingMessageSource adapter prevents duplicates for remote files by using SftpPersistentAcceptOnceFileListFilter based on the in-memory SimpleMetadataStore .
By default, this filter is also applied together with the filename pattern (or regex) as well.
If you need to allow duplicates, you can use the AcceptAllFileListFilter .
You can handle any other use cases by using CompositeFileListFilter (or ChainFileListFilter ).
The Java configuration shown later shows one technique to remove the remote file after processing, avoiding duplicates.
|
For more information about the SftpPersistentAcceptOnceFileListFilter
, and how it is used, see Remote Persistent File List Filters.
You can use the max-fetch-size
attribute to limit the number of files fetched on each poll when a fetch is necessary.
Set it to 1
and use a persistent filter when running in a clustered environment.
See Inbound Channel Adapters: Controlling Remote File Fetching for more information.
The adapter puts the remote directory and the file name in headers (FileHeaders.REMOTE_DIRECTORY
and FileHeaders.REMOTE_FILE
, respectively).
Starting with version 5.0, the FileHeaders.REMOTE_FILE_INFO
header provides additional remote file information (in JSON).
If you set the fileInfoJson
property on the SftpStreamingMessageSource
to false
, the header contains an SftpFileInfo
object.
You can access the SftpClient.DirEntry
object provided by the underlying SftpClient
by using the SftpFileInfo.getFileInfo()
method.
The fileInfoJson
property is not available when you use XML configuration, but you can set it by injecting the SftpStreamingMessageSource
into one of your configuration classes.
See also Remote File Information.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the inbound adapter with Java:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
Notice that, in this example, the message handler downstream of the transformer has an advice
that removes the remote file after processing.
Inbound Channel Adapters: Polling Multiple Servers and Directories
Starting with version 5.0.7, the RotatingServerAdvice
is available; when configured as a poller advice, the inbound adapters can poll multiple servers and directories.
Configure the advice and add it to the poller’s advice chain as normal.
A DelegatingSessionFactory
is used to select the server see Delegating Session Factory for more information.
The advice configuration consists of a list of RotationPolicy.KeyDirectory
objects.
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
This advice will poll directory foo
on server one
until no new files exist then move to directory bar
and then directory baz
on server two
, etc.
This default behavior can be modified with the fair
constructor arg:
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
In this case, the advice will move to the next server/directory regardless of whether the previous poll returned a file.
Alternatively, you can provide your own RotationPolicy
to reconfigure the message source as needed:
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
and
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
The local-filename-generator-expression
attribute (localFilenameGeneratorExpression
on the synchronizer) can now contain the #remoteDirectory
variable.
This allows files retrieved from different directories to be downloaded to similar directories locally:
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
Do not configure a TaskExecutor on the poller when using this advice; see Conditional Pollers for Message Sources for more information.
|
Inbound Channel Adapters: Controlling Remote File Fetching
You should consider two properties when configuring inbound channel adapters.
max-messages-per-poll
, as with all pollers, can be used to limit the number of messages emitted on each poll (if more than the configured value are ready).
max-fetch-size
(since version 5.0) can limit the number of files retrieved from the remote server at a time.
The following scenarios assume the starting state is an empty local directory:
-
max-messages-per-poll=2
andmax-fetch-size=1
: The adapter fetches one file, emits it, fetches the next file, and emit it. Then it sleeps until the next poll. -
max-messages-per-poll=2
andmax-fetch-size=2
): The adapter fetch both files and then emits each one. -
max-messages-per-poll=2
andmax-fetch-size=4
: The adapter fetches up to 4 files (if available) and emits the first two (if there are at least two). The next two files will be emitted on the next poll. -
max-messages-per-poll=2
andmax-fetch-size
not specified: The adapter fetches all remote files and emits the first two (if there are at least two). The subsequent files are emitted on subsequent polls (two at a time). When all are consumed, the remote fetch is attempted again, to pick up any new files.
When you deploy multiple instances of an application, we recommend setting a small max-fetch-size , to avoid one instance “grabbing” all the files and starving other instances.
|
Another use for max-fetch-size
is when you want to stop fetching remote files but continue to process files that have already been fetched.
Setting the maxFetchSize
property on the MessageSource
(programmatically, via JMX, or via a control bus) effectively stops the adapter from fetching more files but lets the poller continue to emit messages for files that have previously been fetched.
If the poller is active when the property is changed, the change takes effect on the next poll.
Starting with version 5.1, the synchronizer can be provided with a Comparator<?>
.
This is useful when restricting the number of files fetched with maxFetchSize
.
SFTP Outbound Channel Adapter
The SFTP outbound channel adapter is a special MessageHandler
that connects to the remote directory and initiates a file transfer for every file it receives as the payload of an incoming Message
.
It also supports several representations of the file so that you are not limited to the File
object.
Similar to the FTP outbound adapter, the SFTP outbound channel adapter supports the following payloads:
-
java.io.File
: The actual file object -
byte[]
: A byte array that represents the file contents -
java.lang.String
: Text that represents the file contents -
java.io.InputStream
: a stream of data to transfer to remote file -
org.springframework.core.io.Resource
: a resource for data to transfer to remote file
The following example shows how to configure an SFTP outbound channel adapter:
<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
session-factory="sftpSessionFactory"
channel="inputChannel"
charset="UTF-8"
remote-file-separator="/"
remote-directory="foo/bar"
remote-filename-generator-expression="payload.getName() + '-mysuffix'"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>
See the schema for more detail on these attributes.
SpEL and the SFTP Outbound Adapter
As with many other components in Spring Integration, you can use the Spring Expression Language (SpEL) when you configure an SFTP outbound channel adapter by specifying two attributes: remote-directory-expression
and remote-filename-generator-expression
(described earlier).
The expression evaluation context has the message as its root object, which lets you use expressions that can dynamically compute the file name or the existing directory path based on the data in the message (from either the 'payload' or the 'headers').
In the preceding example, we define the remote-filename-generator-expression
attribute with an expression value that computes the file name based on its original name while also appending a suffix: '-mysuffix'.
Starting with version 4.1, you can specify the mode
when you are transferring the file.
By default, an existing file is overwritten.
The modes are defined by the FileExistsMode
enumeration, which includes the following values:
-
REPLACE
(default) -
REPLACE_IF_MODIFIED
-
APPEND
-
APPEND_NO_FLUSH
-
IGNORE
-
FAIL
With IGNORE
and FAIL
, the file is not transferred.
FAIL
causes an exception to be thrown, while IGNORE
silently ignores the transfer (although a DEBUG
log entry is produced).
Version 4.3 introduced the chmod
attribute, which you can use to change the remote file permissions after upload.
You can use the conventional Unix octal format (for example, 600
allows read-write for the file owner only).
When configuring the adapter using java, you can use setChmodOctal("600")
or setChmod(0600)
.
Avoiding Partially Written Files
One of the common problems when dealing with file transfers is the possibility of processing a partial file. A file might appear in the file system before its transfer is actually complete.
To deal with this issue, Spring Integration SFTP adapters use a common algorithm in which files are transferred under a temporary name and than renamed once they are fully transferred.
By default, every file that is in the process of being transferred appear in the file system with an additional suffix, which, by default, is .writing
.
You can change by setting the temporary-file-suffix
attribute.
However, there may be situations where you do not want to use this technique (for example, if the server does not permit renaming files).
For situations like this, you can disable this feature by setting use-temporary-file-name
to false
(the default is true
).
When this attribute is false
, the file is written with its final name, and the consuming application needs some other mechanism to detect that the file is completely uploaded before accessing it.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the outbound adapter with Java:
@SpringBootApplication
@IntegrationComponentScan
public class SftpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToSftp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<SftpClient.DirEntry>(factory);
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}
});
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toSftpChannel")
void sendToSftp(File file);
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpOutboundFlow() {
return IntegrationFlow.from("toSftpChannel")
.handle(Sftp.outboundAdapter(this.sftpSessionFactory, FileExistsMode.FAIL)
.useTemporaryFileName(false)
.remoteDirectory("/foo")
).get();
}
}
SFTP Outbound Gateway
The SFTP outbound gateway provides a limited set of commands that let you interact with a remote SFTP server:
-
ls
(list files) -
nlst
(list file names) -
get
(retrieve a file) -
mget
(retrieve multiple files) -
rm
(remove file(s)) -
mv
(move and rename file) -
put
(send a file) -
mput
(send multiple files)
Using the ls
Command
ls
lists remote files and supports the following options:
-
-1
: Retrieve a list of filenames. The default is to retrieve a list ofFileInfo
objects -
-a
: Include all files (including those starting with '.') -
-f
: Do not sort the list -
-dirs
: Include directories (excluded by default) -
-links
: Include symbolic links (excluded by default) -
-R
: List the remote directory recursively
In addition, filename filtering is provided in the same manner as the inbound-channel-adapter
.
The message payload resulting from an ls
operation is a list of file names or a list of FileInfo
objects (depending on whether you usr the -1
switch).
These objects provide information such as modified time, permissions, and others.
The remote directory that the ls
command acted on is provided in the file_remoteDirectory
header.
When using the recursive option (-R
), the fileName
includes any subdirectory elements and represents the relative path to the file (relative to the remote directory).
If you use the -dirs
option, each recursive directory is also returned as an element in the list.
In this case, we recommend that you not use the -1
option, because you would not be able to distinguish files from directories, which you can do when you use FileInfo
objects.
If remote path to list starts with a /
symbol, it is treated by SFTP as an absolute path; without - as a relative path in the current user home.
Using nlst
Command
Version 5 introduced support for the nlst
command.
nlst
lists remote file names and supports only one option:
-
-f
: Do not sort the list
The message payload resulting from an nlst
operation is a list of file names.
The file_remoteDirectory
header holds the remote directory on which the nlst
command acted.
The SFTP protocol does not provide the ability to list names.
This command is the equivalent of the ls
command with the -1
option and is added here for convenience.
Using the get
Command
get
retrieves a remote file and supports the following options:
-
-P
: Preserve the timestamp of the remote file. -
-stream
: Retrieve the remote file as a stream. -
-D
: Delete the remote file after successful transfer. The remote file is not deleted if the transfer is ignored, because theFileExistsMode
isIGNORE
and the local file already exists.
The file_remoteDirectory
header holds the remote directory, and the file_remoteFile
header holds the filename.
The message payload resulting from a get
operation is a File
object representing the retrieved file.
If you use the -stream
option, the payload is an InputStream
rather than a File
.
For text files, a common use case is to combine this operation with a file splitter or a stream transformer.
When consuming remote files as streams, you are responsible for closing the Session
after the stream is consumed.
For convenience, the Session
is provided in the closeableResource
header, and IntegrationMessageHeaderAccessor
offers convenience method:
Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}
Framework components, such as the File Splitter and Stream Transformer, automatically close the session after the data is transferred.
The following example shows how to consume a file as a stream:
<int-sftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />
<int-file:splitter input-channel="stream" output-channel="lines" />
If you consume the input stream in a custom component, you must close the Session .
You can either do that in your custom code or route a copy of the message to a service-activator and use SpEL, as the following example shows:
|
<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />
Using the mget
Command
mget
retrieves multiple remote files based on a pattern and supports the following options:
-
-P
: Preserve the timestamps of the remote files. -
-R
: Retrieve the entire directory tree recursively. -
-x
: Throw an exception if no files match the pattern (otherwise, an empty list is returned). -
-D
: Delete each remote file after successful transfer. If the transfer is ignored, the remote file is not deleted, because theFileExistsMode
isIGNORE
and the local file already exists.
The message payload resulting from an mget
operation is a List<File>
object (that is, a List
of File
objects, each representing a retrieved file).
Starting with version 5.0, if the FileExistsMode is IGNORE , the payload of the output message no longer contain files that were not fetched due to the file already existing.
Previously, the array contained all files, including those that already existed.
|
The expression you use determine the remote path should produce a result that ends with *
for example myfiles/*
fetches the complete tree under myfiles
.
Starting with version 5.0, you can use a recursive MGET
, combined with the FileExistsMode.REPLACE_IF_MODIFIED
mode, to periodically synchronize an entire remote directory tree locally.
This mode sets the local file’s last modified timestamp to the remote file’s timestamp, regardless of the -P
(preserve timestamp) option.
Notes for when using recursion (
-R )The pattern is ignored and If you filter a subdirectory, no additional traversal of that subdirectory is performed. The Typically, you would use the |
The persistent file list filters now have a boolean property forRecursion
.
Setting this property to true
, also sets alwaysAcceptDirectories
, which means that the recursive operation on the outbound gateways (ls
and mget
) will now always traverse the full directory tree each time.
This is to solve a problem where changes deep in the directory tree were not detected.
In addition, forRecursion=true
causes the full path to files to be used as the metadata store keys; this solves a problem where the filter did not work properly if a file with the same name appears multiple times in different directories.
IMPORTANT: This means that existing keys in a persistent metadata store will not be found for files beneath the top level directory.
For this reason, the property is false
by default; this may change in a future release.
Starting with version 5.0, you can configure the SftpSimplePatternFileListFilter
and SftpRegexPatternFileListFilter
to always pass directories by setting the alwaysAcceptDirectorties
to true
.
Doing so allows recursion for a simple pattern, as the following examples show:
<bean id="starDotTxtFilter"
class="org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter">
<constructor-arg value="*.txt" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
<bean id="dotStarDotTxtFilter"
class="org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter">
<constructor-arg value="^.*\.txt$" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
You can provide one of these filters by using the filter
property on the gateway.
Using the put
Command
put
sends a file to the remote server.
The payload of the message can be a java.io.File
, a byte[]
, or a String
.
A remote-filename-generator
(or expression) is used to name the remote file.
Other available attributes include remote-directory
, temporary-remote-directory
and their *-expression
equivalents: use-temporary-file-name
and auto-create-directory
.
See the schema documentation for more information.
The message payload resulting from a put
operation is a String
that contains the full path of the file on the server after transfer.
Version 4.3 introduced the chmod
attribute, which changes the remote file permissions after upload.
You can use the conventional Unix octal format (for example, 600
allows read-write for the file owner only).
When configuring the adapter using java, you can use setChmod(0600)
.
Using the mput
Command
mput
sends multiple files to the server and supports the following option:
-
-R
: Recursive — send all files (possibly filtered) in the directory and subdirectories
The message payload must be a java.io.File
(or String
) that represents a local directory.
Since version 5.1, a collection of File
or String
is also supported.
The same attributes as the put
command are supported.
In addition, you can filter files in the local directory with one of mput-pattern
, mput-regex
, mput-filter
, or mput-filter-expression
.
The filter works with recursion, as long as the subdirectories themselves pass the filter.
Subdirectories that do not pass the filter are not recursed.
The message payload resulting from an mput
operation is a List<String>
object (that is, a List
of remote file paths resulting from the transfer).
Version 4.3 introduced the chmod
attribute, which lets you change the remote file permissions after upload.
You can use the conventional Unix octal format (for example, 600
allows read-write for the file owner only).
When configuring the adapter with Java, you can use setChmodOctal("600")
or setChmod(0600)
.
Using the rm
Command
The rm
command has no options.
If the remove operation was successful, the resulting message payload is Boolean.TRUE
.
Otherwise, the message payload is Boolean.FALSE
.
The file_remoteDirectory
header holds the remote directory, and the file_remoteFile
header holds the file name.
Using the mv
Command
The mv
command has no options.
The expression
attribute defines the “from” path, and the rename-expression
attribute defines the “to” path.
By default, the rename-expression
is headers['file_renameTo']
.
This expression must not evaluate to null or an empty String
.
If necessary, any remote directories needed are created.
The payload of the result message is Boolean.TRUE
.
The file_remoteDirectory
header holds the original remote directory, and the file_remoteFile
header holds the filename.
The file_renameTo
header holds the new path.
Starting with version 5.5.6, the remoteDirectoryExpression
can be used in the mv
command for convenience.
If the “from” file is not a full file path, the result of remoteDirectoryExpression
is used as the remote directory.
The same applies for the “to” file, for example, if the task is just to rename a remote file in some directory.
Additional Command Information
The get
and mget
commands support the local-filename-generator-expression
attribute.
It defines a SpEL expression to generate the names of local files during the transfer.
The root object of the evaluation context is the request message.
The remoteFileName
variable is also available.
It is particularly useful for mget
(for example: local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.foo"
).
The get
and mget
commands support the local-directory-expression
attribute.
It defines a SpEL expression to generate the names of local directories during the transfer.
The root object of the evaluation context is the request message.
The remoteDirectory
variable is also available.
It is particularly useful for mget (for example: local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.myheader"
).
This attribute is mutually exclusive with the local-directory
attribute.
For all commands, the 'expression' property of the gateway holds the path on which the command acts.
For the mget
command, the expression might evaluate to *
, meaning to retrieve all files, somedirectory/*
, and other values that end with *
.
The following example shows a gateway configured for an ls
command:
<int-ftp:outbound-gateway id="gateway1"
session-factory="ftpSessionFactory"
request-channel="inbound1"
command="ls"
command-options="-1"
expression="payload"
reply-channel="toSplitter"/>
The payload of the message sent to the toSplitter
channel is a list of String
objects, each of which contains the name of a file.
If you omitted command-options="-1"
, the payload would be a list of FileInfo
objects.
You can provide options as a space-delimited list (for example, command-options="-1 -dirs -links"
).
Starting with version 4.2, the GET
, MGET
, PUT
, and MPUT
commands support a FileExistsMode
property (mode
when using the namespace support).
This affects the behavior when the local file exists (GET
and MGET
) or the remote file exists (PUT
and MPUT
).
The supported modes are REPLACE
, APPEND
, FAIL
, and IGNORE
.
For backwards compatibility, the default mode for PUT
and MPUT
operations is REPLACE
.
For GET
and MGET
operations, the default is FAIL
.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the outbound gateway with Java:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new SftpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the outbound gateway with the Java DSL:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
factory.setTestSession(true);
return new CachingSessionFactory<>(sf);
}
@Bean
public QueueChannelSpec remoteFileOutputChannel() {
return MessageChannels.queue();
}
@Bean
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlow.from("sftpMgetInputChannel")
.handle(Sftp.outboundGateway(sftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subSftpSource|.*1.txt)")
.localDirectoryExpression("'myDir/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('sftpSource', 'localTarget')"))
.channel("remoteFileOutputChannel")
.get();
}
}
Outbound Gateway Partial Success (mget
and mput
)
When performing operations on multiple files (by using mget
and mput
) an exception can occur some time after one or more files have been transferred.
In this case (starting with version 4.2), a PartialSuccessException
is thrown.
As well as the usual MessagingException
properties (failedMessage
and cause
), this exception has two additional properties:
-
partialResults
: The successful transfer results. -
derivedInput
: The list of files generated from the request message (such as local files to transfer for anmput
).
These attributes let you determine which files were successfully transferred and which were not.
In the case of a recursive mput
, the PartialSuccessException
may have nested PartialSuccessException
instances.
Consider the following directory structure:
root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt
If the exception occurs on file3.txt
, the PartialSuccessException
thrown by the gateway has derivedInput
of file1.txt
, subdir
, and zoo.txt
and partialResults
of file1.txt
.
Its cause
is another PartialSuccessException
with derivedInput
of file2.txt
and file3.txt
and partialResults
of file2.txt
.
MessageSessionCallback
Starting with Spring Integration version 4.2, you can use a MessageSessionCallback<F, T>
implementation with the <int-sftp:outbound-gateway/>
(SftpOutboundGateway
) to perform any operation on the Session<SftpClient.DirEntry>
with the requestMessage
context.
You can use it for any non-standard or low-level SFTP operation (or several), such as allowing access from an integration flow definition, or functional interface (lambda) implementation injection.
The following example uses a lambda:
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler sftpOutboundGateway(SessionFactory<SftpClient.DirEntry> sessionFactory) {
return new SftpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}
Another example might be to pre- or post-process the file data being sent or retrieved.
When using XML configuration, the <int-sftp:outbound-gateway/>
provides a session-callback
attribute that lets you specify the MessageSessionCallback
bean name.
The session-callback is mutually exclusive with the command and expression attributes.
When configuring with Java, the SftpOutboundGateway class offers different constructors.
|
Apache Mina SFTP Server Events
The ApacheMinaSftpEventListener
, added in version 5.2, listens for certain Apache Mina SFTP server events and publishes them as ApplicationEvent
s which can be received by any ApplicationListener
bean, @EventListener
bean method, or Event Inbound Channel Adapter.
Currently, supported events are:
-
SessionOpenedEvent
- a client session was opened -
DirectoryCreatedEvent
- a directory was created -
FileWrittenEvent
- a file was written to -
PathMovedEvent
- a file or directory was renamed -
PathRemovedEvent
- a file or directory was removed -
SessionClosedEvent
- the client has disconnected
Each of these is a subclass of ApacheMinaSftpEvent
; you can configure a single listener to receive all the event types.
The source
property of each event is a ServerSession
, from which you can obtain information such as the client address; a convenient getSession()
method is provided on the abstract event.
To configure the server with the listener (which must be a Spring bean), simply add it to the SftpSubsystemFactory
:
server = SshServer.setUpDefaultServer();
...
SftpSubsystemFactory sftpFactory = new SftpSubsystemFactory();
sftpFactory.addSftpEventListener(apacheMinaSftpEventListenerBean);
...
To consume these events using a Spring Integration event adapter:
@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaSftpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}
Remote File Information
Starting with version 5.2, the SftpStreamingMessageSource
(SFTP Streaming Inbound Channel Adapter), SftpInboundFileSynchronizingMessageSource
(SFTP Inbound Channel Adapter) and "read"-commands of the SftpOutboundGateway
(SFTP Outbound Gateway) provide additional headers in the message to produce with an information about the remote file:
-
FileHeaders.REMOTE_HOST_PORT
- the host:port pair the remote session has been connected to during file transfer operation; -
FileHeaders.REMOTE_DIRECTORY
- the remote directory the operation has been performed; -
FileHeaders.REMOTE_FILE
- the remote file name; applicable only for single file operations.
Since the SftpInboundFileSynchronizingMessageSource
doesn’t produce messages against remote files, but using a local copy, the AbstractInboundFileSynchronizer
stores an information about remote file in the MetadataStore
(which can be configured externally) in the URI style (protocol://host:port/remoteDirectory#remoteFileName
) during synchronization operation.
This metadata is retrieved by the SftpInboundFileSynchronizingMessageSource
when local file is polled.
When local file is deleted, it is recommended to remove its metadata entry.
The AbstractInboundFileSynchronizer
provides a removeRemoteFileMetadata()
callback for this purpose.
In addition, there is a setMetadataStorePrefix()
to be used in the metadata keys.
It is recommended to have this prefix be different from the one used in the MetadataStore
-based FileListFilter
implementations, when the same MetadataStore
instance is shared between these components, to avoid entry overriding because both filter and AbstractInboundFileSynchronizer
use the same local file name for the metadata entry key.