This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.4.0! |
SFTP Inbound Channel Adapter
The SFTP inbound channel adapter is a special listener that connects to the server and listens for the remote directory events (such as a new file being created), at which point it initiates a file transfer. The following example shows how to configure an SFTP inbound channel adapter:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
The preceding configuration example shows how to provide values for various attributes, including the following:
-
local-directory
: The location to which files are going to be transferred -
remote-directory
: The remote source directory from which files are going to be transferred -
session-factory
: A reference to the bean we configured earlier
By default, the transferred file carries the same name as the original file.
If you want to override this behavior, you can set the local-filename-generator-expression
attribute, which lets you provide a SpEL expression to generate the name of the local file.
Unlike outbound gateways and adapters, where the root object of the SpEL evaluation context is a Message
, this inbound adapter does not yet have the message at the time of evaluation, since that is what it ultimately generates with the transferred file as its payload.
Consequently, the root object of the SpEL evaluation context is the original name of the remote file (a String
).
The inbound channel adapter first retrieves the file to a local directory and then emits each file according to the poller configuration.
Starting with version 5.0, you can limit the number of files fetched from the SFTP server when new file retrievals are needed.
This can be beneficial when the target files are large or when running in a clustered system with a persistent file list filter, discussed later in this section.
Use max-fetch-size
for this purpose.
A negative value (the default) means no limit and all matching files are retrieved.
See Inbound Channel Adapters: Controlling Remote File Fetching for more information.
Since version 5.0, you can also provide a custom DirectoryScanner
implementation to the inbound-channel-adapter
by setting the scanner
attribute.
Starting with Spring Integration 3.0, you can specify the preserve-timestamp
attribute (the default is false
).
When true
, the local file’s modified timestamp is set to the value retrieved from the server.
Otherwise, it is set to the current time.
Starting with version 4.2, you can specify remote-directory-expression
instead of remote-directory
, which lets you dynamically determine the directory on each poll — for example, remote-directory-expression="@myBean.determineRemoteDir()"
.
Sometimes, file filtering based on the simple pattern specified via filename-pattern
attribute might not suffice.
If this is the case, you can use the filename-regex
attribute to specify a regular expression (for example, filename-regex=".*\.test$"
).
If you need complete control, you can use the filter
attribute to provide a reference to a custom implementation of the org.springframework.integration.file.filters.FileListFilter
, which is a strategy interface for filtering a list of files.
This filter determines which remote files are retrieved.
You can also combine a pattern-based filter with other filters (such as an AcceptOnceFileListFilter
, to avoid synchronizing files that have previously been fetched) by using a CompositeFileListFilter
.
The AcceptOnceFileListFilter
stores its state in memory.
If you wish the state to survive a system restart, consider using the SftpPersistentAcceptOnceFileListFilter
instead.
This filter stores the accepted file names in an instance of the MetadataStore
strategy (see Metadata Store).
This filter matches on the filename and the remote modified time.
Since version 4.0, this filter requires a ConcurrentMetadataStore
.
When used with a shared data store (such as Redis
with the RedisMetadataStore
), this lets filter keys be shared across multiple application or server instances.
Starting with version 5.0, the SftpPersistentAcceptOnceFileListFilter
with an in-memory SimpleMetadataStore
is applied by default for the SftpInboundFileSynchronizer
.
This filter is also applied, together with the regex
or pattern
option in the XML configuration, as well as through SftpInboundChannelAdapterSpec
in Java DSL.
You can handle any other use-cases by using CompositeFileListFilter
(or ChainFileListFilter
).
The above discussion refers to filtering the files before retrieving them. Once the files have been retrieved, an additional filter is applied to the files on the file system. By default, this is an`AcceptOnceFileListFilter`, which, as discussed in this section, retains state in memory and does not consider the file’s modified time. Unless your application removes files after processing, the adapter re-processes the files on disk by default after an application restart.
Also, if you configure the filter
to use a SftpPersistentAcceptOnceFileListFilter
and the remote file timestamp changes (causing it to be re-fetched), the default local filter does not allow this new file to be processed.
For more information about this filter, and how it is used, see Remote Persistent File List Filters.
You can use the local-filter
attribute to configure the behavior of the local file system filter.
Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter
is configured by default.
This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore
strategy (see Metadata Store) and detects changes to the local file modified time.
The default MetadataStore
is a SimpleMetadataStore
that stores state in memory.
Since version 4.1.5, these filters have a new property called flushOnUpdate
, which causes them to flush the
metadata store on every update (if the store implements Flushable
).
Further, if you use a distributed MetadataStore (such as Redis Metadata Store), you can have multiple instances of the same adapter or application and be sure that one and only one instance processes a file.
|
The actual local filter is a CompositeFileListFilter
that contains the supplied filter and a pattern filter that prevents processing files that are in the process of being downloaded (based on the temporary-file-suffix
).
Files are downloaded with this suffix (the default is .writing
), and the files are renamed to their final names when the transfer is complete, making them 'visible' to the filter.
See the schema for more detail on these attributes.
SFTP inbound channel adapter is a polling consumer.
Therefore, you must configure a poller (either a global default or a local element).
Once the file has been transferred to a local directory, a message with java.io.File
as its payload type is generated and sent to the channel identified by the channel
attribute.
Starting with version 6.2, you can filter SFTP files based on last-modified strategy using SftpLastModifiedFileListFilter
.
This filter can be configured with an age
property so that only files older than this value are passed by the filter.
The age defaults to 60 seconds, but you should choose an age that is large enough to avoid picking up a file early (due to, say, network glitches).
Look into its Javadoc for more information.
More on File Filtering and Large Files
Sometimes, a file that just appeared in the monitored (remote) directory is not complete.
Typically, such a file is written with some temporary extension (such as .writing
on a file named something.txt.writing
) and then renamed after the writing process completes.
In most cases, developers are interested only in files that are complete and would like to filter only those files.
To handle these scenarios, you can use the filtering support provided by the filename-pattern
, filename-regex
, and filter
attributes.
If you need a custom filter implementation, you can include a reference in your adapter by setting the filter
attribute.
The following example shows how to do so:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
Recovering from Failures
You should understand the architecture of the adapter.
A file synchronizer fetches the files, and a FileReadingMessageSource
emits a message for each synchronized file.
As discussed earlier, two filters are involved.
The filter
attribute (and patterns) refers to the remote (SFTP) file list, to avoid fetching files that have already been fetched.
the FileReadingMessageSource
uses the local-filter
to determine which files are to be sent as messages.
The synchronizer lists the remote files and consults its filter.
The files are then transferred.
If an IO error occurs during file transfer, any files that have already been added to the filter are removed so that they are eligible to be re-fetched on the next poll.
This applies only if the filter implements ReversibleFileListFilter
(such as the AcceptOnceFileListFilter
).
If, after synchronizing the files, an error occurs on the downstream flow processing a file, no automatic rollback of the filter occurs, so the failed file is not reprocessed by default.
If you wish to reprocess such files after a failure, you can use a configuration similar to the following to facilitate the removal of the failed file from the filter:
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
The preceding configuration works for any ResettableFileListFilter
.
Starting with version 5.0, the inbound channel adapter can build sub-directories locally, according to the generated local file name.
That can be a remote sub-path as well.
To be able to read a local directory recursively for modification according to the hierarchy support, you can now supply an internal FileReadingMessageSource
with a new RecursiveDirectoryScanner
based on the Files.walk()
algorithm.
See AbstractInboundFileSynchronizingMessageSource.setScanner()
for more information.
Also, you can now switch the AbstractInboundFileSynchronizingMessageSource
to the WatchService
-based DirectoryScanner
by using setUseWatchService()
option.
It is also configured for all the WatchEventType
instances to react for any modifications in local directory.
The reprocessing sample shown earlier is based on the built-in functionality of the FileReadingMessageSource.WatchServiceDirectoryScanner
, which uses ResettableFileListFilter.remove()
when the file is deleted (StandardWatchEventKinds.ENTRY_DELETE
) from the local directory.
See WatchServiceDirectoryScanner
for more information.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the inbound adapter with Java:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the inbound adapter with the Java DSL:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
Dealing With Incomplete Data
The SftpSystemMarkerFilePresentFileListFilter
is provided to filter remote files that don’t have the corresponding marker file on the remote system.
See the Javadoc for configuration information.