FTP Streaming Inbound Channel Adapter
Version 4.3 introduced the streaming inbound channel adapter.
This adapter produces message with payloads of type InputStream
, letting files be fetched without writing to the
local file system.
Since the session remains open, the consuming application is responsible for closing the session when the file has been
consumed.
The session is provided in the closeableResource
header (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
).
Standard framework components, such as the FileSplitter
and StreamTransformer
, automatically close the session.
See File Splitter and Stream Transformer for more information about these components.
The following example shows how to configure an inbound-streaming-channel-adapter
:
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
Only one of filename-pattern
, filename-regex
, filter
, or filter-expression
is allowed.
Starting with version 5.0, by default, the FtpStreamingMessageSource adapter prevents duplicates for remote files with FtpPersistentAcceptOnceFileListFilter based on the in-memory SimpleMetadataStore .
By default, this filter is also applied with the filename pattern (or regex).
If you need to allow duplicates, you can use AcceptAllFileListFilter .
Any other use cases can be handled by CompositeFileListFilter (or ChainFileListFilter ).
The Java configuration (later in the document) shows one technique to remove the remote file after processing to avoid duplicates.
|
For more information about the FtpPersistentAcceptOnceFileListFilter
, and how it is used, see Remote Persistent File List Filters.
Use the max-fetch-size
attribute to limit the number of files fetched on each poll when a fetch is necessary.
Set it to 1
and use a persistent filter when running in a clustered environment.
See Inbound Channel Adapters: Controlling Remote File Fetching for more information.
The adapter puts the remote directory and file name in the FileHeaders.REMOTE_DIRECTORY
and FileHeaders.REMOTE_FILE
headers, respectively.
Starting with version 5.0, the FileHeaders.REMOTE_FILE_INFO
header provides additional remote file information (represented in JSON by default).
If you set the fileInfoJson
property on the FtpStreamingMessageSource
to false
, the header contains an FtpFileInfo
object.
The FTPFile
object provided by the underlying Apache Net library can be accessed by using the FtpFileInfo.getFileInfo()
method.
The fileInfoJson
property is not available when you use XML configuration, but you can set it by injecting the FtpStreamingMessageSource
into one of your configuration classes.
See also Remote File Information.
Starting with version 5.1, the generic type of the comparator
is FTPFile
.
Previously, it was AbstractFileInfo<FTPFile>
.
This is because the sort is now performed earlier in the processing, before filtering and applying maxFetch
.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
Notice that, in this example, the message handler downstream of the transformer has an advice
that removes the remote file after processing.