FTP/FTPS Adapters
Spring Integration provides support for file transfer operations with FTP and FTPS.
The File Transfer Protocol (FTP) is a simple network protocol that lets you transfer files between two computers on the Internet. FTPS stands for “FTP over SSL”.
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>5.2.4.RELEASE</version>
</dependency>
compile "org.springframework.integration:spring-integration-ftp:5.2.4.RELEASE"
There are two actors when it comes to FTP communication: client and server. To transfer files with FTP or FTPS, you use a client that initiates a connection to a remote computer that is running an FTP server. After the connection is established, the client can choose to send or receive copies of files.
Spring Integration supports sending and receiving files over FTP or FTPS by providing three client-side endpoints: inbound channel adapter, outbound channel adapter, and outbound gateway. It also provides convenient namespace-based configuration options for defining these client components.
To use the FTP namespace, add the following to the header of your XML file:
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd"
FTP Session Factory
Spring Integration provides factories you can use to create FTP (or FTPS) sessions.
Default Factories
Starting with version 3.0, sessions are no longer cached by default. See FTP Session Caching. |
Before configuring FTP adapters, you must configure an FTP session factory.
You can configure the FTP Session Factory with a regular bean definition where the implementation class is o.s.i.ftp.session.DefaultFtpSessionFactory
.
The following example shows a basic configuration:
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="kermit"/>
<property name="password" value="frog"/>
<property name="clientMode" value="0"/>
<property name="fileType" value="2"/>
<property name="bufferSize" value="100000"/>
</bean>
For FTPS connections, you can use o.s.i.ftp.session.DefaultFtpsSessionFactory
instead.
The following example shows a complete configuration:
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpsSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="oleg"/>
<property name="password" value="password"/>
<property name="clientMode" value="1"/>
<property name="fileType" value="2"/>
<property name="useClientMode" value="true"/>
<property name="cipherSuites" value="a,b.c"/>
<property name="keyManager" ref="keyManager"/>
<property name="protocol" value="SSL"/>
<property name="trustManager" ref="trustManager"/>
<property name="prot" value="P"/>
<property name="needClientAuth" value="true"/>
<property name="authValue" value="oleg"/>
<property name="sessionCreation" value="true"/>
<property name="protocols" value="SSL, TLS"/>
<property name="implicit" value="true"/>
</bean>
If you experience connectivity problems and would like to trace session creation as well as see which sessions are polled, you can enable session tracing by setting the logger to the TRACE level (for example, log4j.category.org.springframework.integration.file=TRACE ).
|
Now you need only inject these session factories into your adapters. The protocol (FTP or FTPS) that an adapter uses depends on the type of session factory that has been injected into the adapter.
A more practical way to provide values for FTP or FTPS session factories is to use Spring’s property placeholder support (See https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-factory-placeholderconfigurer). |
Advanced Configuration
DefaultFtpSessionFactory
provides an abstraction over the underlying client API, which (since Spring Integration 2.0) is Apache Commons Net.
This spares you from the low-level configuration details of the org.apache.commons.net.ftp.FTPClient
.
Several common properties are exposed on the session factory (since version 4.0, this now includes connectTimeout
, defaultTimeout
, and dataTimeout
).
However, you sometimes need access to lower level FTPClient
configuration to achieve more advanced configuration (such as setting the port range for active mode).
For that purpose, AbstractFtpSessionFactory
(the base class for all FTP Session Factories) exposes hooks, in the form of the two post-processing methods shown in the following listing:
/**
* Will handle additional initialization after client.connect() method was invoked,
* but before any action on the client has been taken
*/
protected void postProcessClientAfterConnect(T t) throws IOException {
// NOOP
}
/**
* Will handle additional initialization before client.connect() method was invoked.
*/
protected void postProcessClientBeforeConnect(T client) throws IOException {
// NOOP
}
As you can see, there is no default implementation for these two methods.
However, by extending DefaultFtpSessionFactory
, you can override these methods to provide more advanced configuration of the FTPClient
, as the following example shows:
public class AdvancedFtpSessionFactory extends DefaultFtpSessionFactory {
protected void postProcessClientBeforeConnect(FTPClient ftpClient) throws IOException {
ftpClient.setActivePortRange(4000, 5000);
}
}
FTPS and Shared SSLSession
When using FTP over SSL or TLS, some servers require the same SSLSession
to be used on the control and data connections.
This is to prevent “stealing” data connections.
See https://scarybeastsecurity.blogspot.cz/2009/02/vsftpd-210-released.html for more information.
Currently, the Apache FTPSClient does not support this feature. See NET-408.
The following solution, courtesy of Stack Overflow, uses reflection on the sun.security.ssl.SSLSessionContextImpl
, so it may not work on other JVMs.
The stack overflow answer was submitted in 2015, and the solution has been tested by the Spring Integration team recently on JDK 1.8.0_112.
The following example shows how to create an FTPS session:
@Bean
public DefaultFtpsSessionFactory sf() {
DefaultFtpsSessionFactory sf = new DefaultFtpsSessionFactory() {
@Override
protected FTPSClient createClientInstance() {
return new SharedSSLFTPSClient();
}
};
sf.setHost("...");
sf.setPort(21);
sf.setUsername("...");
sf.setPassword("...");
sf.setNeedClientAuth(true);
return sf;
}
private static final class SharedSSLFTPSClient extends FTPSClient {
@Override
protected void _prepareDataSocket_(final Socket socket) throws IOException {
if (socket instanceof SSLSocket) {
// Control socket is SSL
final SSLSession session = ((SSLSocket) _socket_).getSession();
final SSLSessionContext context = session.getSessionContext();
context.setSessionCacheSize(0); // you might want to limit the cache
try {
final Field sessionHostPortCache = context.getClass()
.getDeclaredField("sessionHostPortCache");
sessionHostPortCache.setAccessible(true);
final Object cache = sessionHostPortCache.get(context);
final Method method = cache.getClass().getDeclaredMethod("put", Object.class,
Object.class);
method.setAccessible(true);
String key = String.format("%s:%s", socket.getInetAddress().getHostName(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
key = String.format("%s:%s", socket.getInetAddress().getHostAddress(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
}
catch (NoSuchFieldException e) {
// Not running in expected JRE
logger.warn("No field sessionHostPortCache in SSLSessionContext", e);
}
catch (Exception e) {
// Not running in expected JRE
logger.warn(e.getMessage());
}
}
}
}
Delegating Session Factory
Version 4.2 introduced the DelegatingSessionFactory
, which allows the selection of the actual session factory at runtime.
Prior to invoking the FTP endpoint, call setThreadKey()
on the factory to associate a key with the current thread.
That key is then used to lookup the actual session factory to be used.
You can clear the key by calling clearThreadKey()
after use.
We added convenience methods so that you can easily do use a delegating session factory from a message flow.
The following example shows how to declare a delegating session factory:
<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />
<int-ftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />
<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />
When you use session caching (see FTP Session Caching), each of the delegates should be cached.
You cannot cache the DelegatingSessionFactory itself.
|
Starting with version 5.0.7, the DelegatingSessionFactory
can be used in conjunction with a RotatingServerAdvice
to poll multiple servers; see Inbound Channel Adapters: Polling Multiple Servers and Directories.
FTP Inbound Channel Adapter
The FTP inbound channel adapter is a special listener that connects to the FTP server and listens for the remote directory events (for example, new file created) at which point it initiates a file transfer.
The following example shows how to configure an inbound-channel-adapter
:
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
As the preceding configuration shows, you can configure an FTP inbound channel adapter by using the inbound-channel-adapter
element while also providing values for various attributes, such as local-directory
, filename-pattern
(which is based on simple pattern matching, not regular expressions), and the reference to a session-factory
.
By default, the transferred file carries the same name as the original file.
If you want to override this behavior, you can set the local-filename-generator-expression
attribute, which lets you provide a SpEL expression to generate the name of the local file.
Unlike outbound gateways and adapters, where the root object of the SpEL evaluation context is a Message
, this inbound adapter does not yet have the message at the time of evaluation, since that’s what it ultimately generates with the transferred file as its payload.
Consequently, the root object of the SpEL evaluation context is the original name of the remote file (a String
).
The inbound channel adapter first retrieves the File
object for a local directory and then emits each file according to the poller configuration.
Starting with version 5.0, you can now limit the number of files fetched from the FTP server when new file retrievals are needed.
This can be beneficial when the target files are very large or when you run in a clustered system with a persistent file list filter, discussed later.
Use max-fetch-size
for this purpose.
A negative value (the default) means no limit and all matching files are retrieved.
See Inbound Channel Adapters: Controlling Remote File Fetching for more information.
Since version 5.0, you can also provide a custom DirectoryScanner
implementation to the inbound-channel-adapter
by setting the scanner
attribute.
Starting with Spring Integration 3.0, you can specify the preserve-timestamp
attribute (its default is false
).
When true
, the local file’s modified timestamp is set to the value retrieved from the server.
Otherwise, it is set to the current time.
Starting with version 4.2, you can specify remote-directory-expression
instead of remote-directory
, letting you dynamically determine the directory on each poll — for example, remote-directory-expression="@myBean.determineRemoteDir()"
.
Starting with version 4.3, you can omit the remote-directory
and remote-directory-expression
attributes.
They default to null
.
In this case, according to the FTP protocol, the client working directory is used as the default remote directory.
Sometimes, file filtering based on the simple pattern specified with the filename-pattern
attribute might not suffice.
If this is the case, you can use the filename-regex
attribute to specify a regular expression (such as filename-regex=".*\.test$"
).
Also, if you need complete control, you can use the filter
attribute and provide a reference to any custom implementation of the o.s.i.file.filters.FileListFilter
, a strategy interface for filtering a list of files.
This filter determines which remote files are retrieved.
You can also combine a pattern-based filter with other filters (such as an AcceptOnceFileListFilter
to avoid synchronizing files that have previously been fetched) by using a CompositeFileListFilter
.
The AcceptOnceFileListFilter
stores its state in memory.
If you wish the state to survive a system restart, consider using the FtpPersistentAcceptOnceFileListFilter
instead.
This filter stores the accepted file names in an instance of the MetadataStore
strategy (see Metadata Store).
This filter matches on the filename and the remote modified time.
Since version 4.0, this filter requires a ConcurrentMetadataStore
.
When used with a shared data store (such as Redis
with the RedisMetadataStore
), it lets filter keys be shared across multiple application or server instances.
Starting with version 5.0, the FtpPersistentAcceptOnceFileListFilter
with in-memory SimpleMetadataStore
is applied by default for the FtpInboundFileSynchronizer
.
This filter is also applied with the regex
or pattern
option in the XML configuration as well as with FtpInboundChannelAdapterSpec
in the Java DSL.
Any other use cases can be managed with CompositeFileListFilter
(or ChainFileListFilter
).
The preceding discussion refers to filtering the files before retrieving them.
Once the files have been retrieved, an additional filter is applied to the files on the file system.
By default, this is an AcceptOnceFileListFilter
which, as discussed earlier, retains state in memory and does not consider the file’s modified time.
Unless your application removes files after processing, the adapter will re-process the files on disk by default after an application restart.
Also, if you configure the filter
to use a FtpPersistentAcceptOnceFileListFilter
and the remote file timestamp changes (causing it to be re-fetched), the default local filter does not let this new file be processed.
For more information about this filter, and how it is used, see Remote Persistent File List Filters.
You can use the local-filter
attribute to configure the behavior of the local file system filter.
Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter
is configured by default.
This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore
strategy (see Metadata Store) and detects changes to the local file modified time.
The default MetadataStore
is a SimpleMetadataStore
, which stores state in memory.
Since version 4.1.5, these filters have a new property (flushOnUpdate
) that causes them to flush the
metadata store on every update (if the store implements Flushable
).
Further, if you use a distributed MetadataStore (such as Redis or GemFire), you can have multiple instances of the same adapter or application and be sure that each file is processed only once.
|
The actual local filter is a CompositeFileListFilter
that contains the supplied filter and a pattern filter that prevents processing files that are in the process of being downloaded (based on the temporary-file-suffix
).
Files are downloaded with this suffix (the default is .writing
), and the file is renamed to its final name when the transfer is complete, making it 'visible' to the filter.
The remote-file-separator
attribute lets you configure a file separator character to use if the default '/' is not applicable for your particular environment.
See the schema for more details on these attributes.
You should also understand that the FTP inbound channel adapter is a polling consumer.
Therefore, you must configure a poller (by using either a global default or a local sub-element).
Once a file has been transferred, a message with a java.io.File
as its payload is generated and sent to the channel identified by the channel
attribute.
More on File Filtering and Incomplete Files
Sometimes the file that just appeared in the monitored (remote) directory is not complete.
Typically, such a file is written with a temporary extension (such as somefile.txt.writing
) and is then renamed once the writing process finishes.
In most cases, you are only interested in files that are complete and would like to filter for only files that are complete.
To handle these scenarios, you can use the filtering support provided by the filename-pattern
, filename-regex
, and filter
attributes.
The following example uses a custom filter implementation:
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
Poller Configuration Notes for the Inbound FTP Adapter
The job of the inbound FTP adapter consists of two tasks:
-
Communicate with a remote server in order to transfer files from a remote directory to a local directory.
-
For each transferred file, generate a message with that file as a payload and send it to the channel identified by the 'channel' attribute. That is why they are called "'channel adapters'" rather than just "'adapters'". The main job of such an adapter is to generate a message to send to a message channel. Essentially, the second task takes precedence in such a way that, if your local directory already has one or more files, it first generates messages from those. Only when all local files have been processed does it initiate the remote communication to retrieve more files.
Also, when configuring a trigger on the poller, you should pay close attention to the max-messages-per-poll
attribute.
Its default value is 1
for all SourcePollingChannelAdapter
instances (including FTP).
This means that, as soon as one file is processed, it waits for the next execution time as determined by your trigger configuration.
If you happened to have one or more files sitting in the local-directory
, it would process those files before it would initiate communication with the remote FTP server.
Also, if the max-messages-per-poll
is set to 1
(the default), it processes only one file at a time with intervals as defined by your trigger, essentially working as “one-poll === one-file”.
For typical file-transfer use cases, you most likely want the opposite behavior: to process all the files you can for each poll and only then wait for the next poll.
If that is the case, set max-messages-per-poll
to -1.
Then, on each poll, the adapter tries to generate as many messages as it possibly can.
In other words, it processes everything in the local directory, and then it connects to the remote directory to transfer everything that is available there to be processed locally.
Only then is the poll operation considered complete, and the poller waits for the next execution time.
You can alternatively set the 'max-messages-per-poll' value to a positive value that indicates the upward limit of messages to be created from files with each poll.
For example, a value of 10
means that, on each poll, it tries to process no more than ten files.
Recovering from Failures
It is important to understand the architecture of the adapter.
There is a file synchronizer that fetches the files and a FileReadingMessageSource
that emits a message for each
synchronized file.
As discussed earlier, two filters are involved.
The filter
attribute (and patterns) refers to the remote (FTP) file list, to avoid fetching files that have already
been fetched.
The local-filter
is used by the FileReadingMessageSource
to determine which files are to be sent as messages.
The synchronizer lists the remote files and consults its filter.
The files are then transferred.
If an IO error occurs during file transfer, any files that have already been added to the filter are removed so that they
are eligible to be re-fetched on the next poll.
This only applies if the filter implements ReversibleFileListFilter
(such as the AcceptOnceFileListFilter
).
If, after synchronizing the files, an error occurs on the downstream flow processing a file, no automatic rollback of the filter occurs, so the failed file is not reprocessed by default.
If you wish to reprocess such files after a failure, you can use configuration similar to the following to facilitate the removal of the failed file from the filter:
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
The preceding configuration works for any ResettableFileListFilter
.
Starting with version 5.0, the inbound channel adapter can build sub-directories locally that correspond to the generated local file name.
That can be a remote sub-path as well.
To be able to read a local directory recursively for modification according to the hierarchy support, you can now supply an internal FileReadingMessageSource
with a new RecursiveDirectoryScanner
based on the Files.walk()
algorithm.
See AbstractInboundFileSynchronizingMessageSource.setScanner()
for more information.
Also, you can now switch the AbstractInboundFileSynchronizingMessageSource
to the WatchService
-based DirectoryScanner
by using setUseWatchService()
option.
It is also configured for all the WatchEventType
instances to react to any modifications in local directory.
The reprocessing sample shown earlier is based on the built-in functionality of the FileReadingMessageSource.WatchServiceDirectoryScanner
to perform ResettableFileListFilter.remove()
when the file is deleted (StandardWatchEventKinds.ENTRY_DELETE
) from the local directory.
See WatchServiceDirectoryScanner
for more information.
Configuring with Java Configuration
The following Spring Boot application show an example of how to configure the inbound adapter with Java configuration:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the inbound adapter with the Java DSL:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Fpt.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
Dealing With Incomplete Data
The FtpSystemMarkerFilePresentFileListFilter
is provided to filter remote files that do not have a corresponding marker file on the remote system.
See the Javadoc (and browse to the parent classes) for configuration information.
FTP Streaming Inbound Channel Adapter
Version 4.3 introduced the streaming inbound channel adapter.
This adapter produces message with payloads of type InputStream
, letting files be fetched without writing to the
local file system.
Since the session remains open, the consuming application is responsible for closing the session when the file has been
consumed.
The session is provided in the closeableResource
header (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
).
Standard framework components, such as the FileSplitter
and StreamTransformer
, automatically close the session.
See File Splitter and Stream Transformer for more information about these components.
The following example shows how to configure an inbound-streaming-channel-adapter
:
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
Only one of filename-pattern
, filename-regex
, filter
, or filter-expression
is allowed.
Starting with version 5.0, by default, the FtpStreamingMessageSource adapter prevents duplicates for remote files with FtpPersistentAcceptOnceFileListFilter based on the in-memory SimpleMetadataStore .
By default, this filter is also applied with the filename pattern (or regex).
If you need to allow duplicates, you can use AcceptAllFileListFilter .
Any other use cases can be handled by CompositeFileListFilter (or ChainFileListFilter ).
The Java configuration (later in the document) shows one technique to remove the remote file after processing to avoid duplicates.
|
For more information about the FtpPersistentAcceptOnceFileListFilter
, and how it is used, see Remote Persistent File List Filters.
Use the max-fetch-size
attribute to limit the number of files fetched on each poll when a fetch is necessary.
Set it to 1
and use a persistent filter when running in a clustered environment.
See Inbound Channel Adapters: Controlling Remote File Fetching for more information.
The adapter puts the remote directory and file name in the FileHeaders.REMOTE_DIRECTORY
and FileHeaders.REMOTE_FILE
headers, respectively.
Starting with version 5.0, the FileHeaders.REMOTE_FILE_INFO
header provides additional remote file information (represented in JSON by default).
If you set the fileInfoJson
property on the FtpStreamingMessageSource
to false
, the header contains an FtpFileInfo
object.
The FTPFile
object provided by the underlying Apache Net library can be accessed by using the FtpFileInfo.getFileInfo()
method.
The fileInfoJson
property is not available when you use XML configuration, but you can set it by injecting the FtpStreamingMessageSource
into one of your configuration classes.
See also Remote File Information.
Starting with version 5.1, the generic type of the comparator
is FTPFile
.
Previously, it was AbstractFileInfo<FTPFile>
.
This is because the sort is now performed earlier in the processing, before filtering and applying maxFetch
.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
Notice that, in this example, the message handler downstream of the transformer has an advice that removes the remote file after processing.
Inbound Channel Adapters: Polling Multiple Servers and Directories
Starting with version 5.0.7, the RotatingServerAdvice
is available; when configured as a poller advice, the inbound adapters can poll multiple servers and directories.
Configure the advice and add it to the poller’s advice chain as normal.
A DelegatingSessionFactory
is used to select the server see Delegating Session Factory for more information.
The advice configuration consists of a list of RotationPolicy.KeyDirectory
objects.
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
This advice will poll directory foo
on server one
until no new files exist then move to directory bar
and then directory baz
on server two
, etc.
This default behavior can be modified with the fair
constructor arg:
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
In this case, the advice will move to the next server/directory regardless of whether the previous poll returned a file.
Alternatively, you can provide your own RotationPolicy
to reconfigure the message source as needed:
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
and
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
The local-filename-generator-expression
attribute (localFilenameGeneratorExpression
on the synchronizer) can now contain the #remoteDirectory
variable.
This allows files retrieved from different directories to be downloaded to similar directories locally:
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Ftp.inboundAdapter(sf())
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
Do not configure a TaskExecutor on the poller when using this advice; see Conditional Pollers for Message Sources for more information.
|
Inbound Channel Adapters: Controlling Remote File Fetching
There are two properties that you should consider when you configure inbound channel adapters.
max-messages-per-poll
, as with all pollers, can be used to limit the number of messages emitted on each poll (if more than the configured value are ready).
max-fetch-size
(since version 5.0) can limit the number of files retrieved from the remote server at one time.
The following scenarios assume the starting state is an empty local directory:
-
max-messages-per-poll=2
andmax-fetch-size=1
: The adapter fetches one file, emits it, fetches the next file, emits it, and then sleeps until the next poll. -
max-messages-per-poll=2
andmax-fetch-size=2
): The adapter fetches both files and then emits each one. -
max-messages-per-poll=2
andmax-fetch-size=4
: The adapter fetches up to four files (if available) and emits the first two (if there are at least two). The next two files are emitted on the next poll. -
max-messages-per-poll=2
andmax-fetch-size
not specified: The adapter fetches all remote files and emits the first two (if there are at least two). The subsequent files are emitted on subsequent polls (two at a time). When all files are consumed, the remote fetch is attempted again, to pick up any new files.
When you deploy multiple instances of an application, we recommend a small max-fetch-size , to avoid one instance “grabbing” all the files and starving other instances.
|
Another use for max-fetch-size
is if you want to stop fetching remote files but continue to process files that have already been fetched.
Setting the maxFetchSize
property on the MessageSource
(programmatically, with JMX, or with a control bus) effectively stops the adapter from fetching more files but lets the poller continue to emit messages for files that have previously been fetched.
If the poller is active when the property is changed, the change takes effect on the next poll.
Starting with version 5.1, the synchronizer can be provided with a Comparator<FTPFile>
.
This is useful when restricting the number of files fetched with maxFetchSize
.
FTP Outbound Channel Adapter
The FTP outbound channel adapter relies on a MessageHandler
implementation that connects to the FTP server and initiates an FTP transfer for every file it receives in the payload of incoming messages.
It also supports several representations of a file, so you are not limited only to java.io.File
-typed payloads.
The FTP outbound channel adapter supports the following payloads:
-
java.io.File
: The actual file object -
byte[]
: A byte array that represents the file contents -
java.lang.String
: Text that represents the file contents.
The following example shows how to configure an outbound-channel-adapter
:
<int-ftp:outbound-channel-adapter id="ftpOutbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
charset="UTF-8"
remote-file-separator="/"
auto-create-directory="true"
remote-directory-expression="headers['remote_dir']"
temporary-remote-directory-expression="headers['temp_remote_dir']"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>
The preceding configuration shows how you can configure an FTP outbound channel adapter by using the outbound-channel-adapter
element while also providing values for various attributes, such as filename-generator
(an implementation of the o.s.i.file.FileNameGenerator
strategy interface), a reference to a session-factory
, and other attributes.
You can also see some examples of *expression
attributes that let you use SpEL to configure settings such as remote-directory-expression
, temporary-remote-directory-expression
, and remote-filename-generator-expression
(a SpEL alternative to filename-generator
, shown in the preceding example).
As with any component that allows the usage of SpEL, access to the payload and the message Headers is available through the 'payload' and 'headers' variables.
See the schema for more details on the available attributes.
By default, if no file name generator is specified, Spring Integration uses o.s.i.file.DefaultFileNameGenerator .
DefaultFileNameGenerator determines the file name based on the value of the file_name header (if it exists) in the MessageHeaders , or, if the payload of the Message is already a java.io.File , it uses the original name of that file.
|
Defining certain values (such as remote-directory ) might be platform- or FTP server-dependent.
For example, as was reported on https://forum.spring.io/showthread.php?p=333478&posted=1#post333478, on some platforms, you must add a slash to the end of the directory definition (for example, remote-directory="/thing1/thing2/" instead of remote-directory="/thing1/thing2" ).
|
Starting with version 4.1, you can specify the mode
when transferring the file.
By default, an existing file is overwritten.
The modes are defined by the FileExistsMode
enumeration, which includes the following values:
-
REPLACE
(default) *APPEND
-
IGNORE
-
FAIL
IGNORE
and FAIL
do not transfer the file.
FAIL
causes an exception to be thrown, while IGNORE
silently ignores the transfer (although a DEBUG
log entry is produced).
Version 5.2 introduced the chmod
attribute, which you can use to change the remote file permissions after upload.
You can use the conventional Unix octal format (for example, 600
allows read-write for the file owner only).
When configuring the adapter using java, you can use setChmodOctal("600")
or setChmod(0600)
.
Only applies if your FTP server supports the SITE CHMOD
subcommand.
Avoiding Partially Written Files
One of the common problems that arises when dealing with file transfers is the possibility of processing a partial file. That is, a file might appear in the file system before its transfer is actually complete.
To deal with this issue, Spring Integration FTP adapters use a common algorithm: Files are transferred under a temporary name and then renamed once they are fully transferred.
By default, every file that is in the process of being transferred appears in the file system with an additional suffix, which, by default, is .writing
.
You can change this suffix by setting the temporary-file-suffix
attribute.
However, there may be situations where you do not want to use this technique (for example, if the server does not permit renaming files).
For situations like this, you can disable this feature by setting use-temporary-file-name
to false
(the default is true
).
When this attribute is false
, the file is written with its final name and the consuming application needs some other mechanism to detect that the file is completely uploaded before accessing it.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the outbound adapter with Java configuration:
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpMessageHandler handler = new FtpMessageHandler(ftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}
});
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the outbound adapter using the Java DSL:
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public IntegrationFlow ftpOutboundFlow() {
return IntegrationFlows.from("toFtpChannel")
.handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
.remoteDirectory(this.ftpServer.getTargetFtpDirectory().getName())
).get();
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
FTP Outbound Gateway
The FTP outbound gateway provides a limited set of commands to interact with a remote FTP or FTPS server. The supported commands are:
-
ls
(list files) -
nlst
(list file names) -
get
(retrieve file) -
mget
(retrieve file(s)) -
rm
(remove file(s)) -
mv
(move/rename file) -
put
(send file) -
mput
(send multiple files)
Using the ls
Command
ls
lists remote files and supports the following options:
-
-1
: Retrieve a list of file names. The default is to retrieve a list ofFileInfo
objects. -
-a
: Include all files (including those starting with '.') -
-f
: Do not sort the list -
-dirs
: Include directories (they are excluded by default) -
-links
: Include symbolic links (they are excluded by default) -
-R
: List the remote directory recursively
In addition, filename filtering is provided, in the same manner as the inbound-channel-adapter
.
See FTP Inbound Channel Adapter.
The message payload resulting from an ls
operation is a list of file names or a list of FileInfo
objects.
These objects provide information such as modified time, permissions, and other details.
The remote directory that the ls
command acted on is provided in the file_remoteDirectory
header.
When using the recursive option (-R
), the fileName
includes any subdirectory elements, representing a relative path to the file (relative to the remote directory).
If the -dirs
option is included, each recursive directory is also returned as an element in the list.
In this case, it is recommended that you not use the -1
option, because you would not be able to distinguish files from directories, which you can do with the FileInfo
objects.
Starting with version 4.3, the FtpSession
supports null
for the list()
and listNames()
methods.
Therefore, you can omit the expression
attribute.
For convenience, Java configuration has two constructors that do not have an expression
argument.
or LS
, NLST
, PUT
and MPUT
commands, null
is treated as the client working directory, according to the FTP protocol.
All other commands must be supplied with the expression
to evaluate the remote path against the request message.
You can set the working directory with the FTPClient.changeWorkingDirectory()
function when you extend the DefaultFtpSessionFactory
and implement the postProcessClientAfterConnect()
callback.
Using the nlst
Command
Version 5 introduced support for the nlst
command.
nlst
lists remote file names and supports only one option:
-
-f
: Do not sort the list
The message payload resulting from an nlst
operation is a list of file names.
The remote directory that the nlst
command acted on is provided in the file_remoteDirectory
header.
Unlike the -1
option for the ls
command, which uses the LIST
command, the nlst
command sends an NLST
command to the target FTP server.
This command is useful when the server does not support LIST
(due to security restrictions, for example).
The result of the nlst
operation is the names without other detail.
Therefore, the framework cannot determine if an entity is a directory, to perform filtering or recursive listing, for example.
Using the get
Command
get
retrieves a remote file.
It supports the following option:
-
-P
: Preserve the timestamp of the remote file. -
-stream
: Retrieve the remote file as a stream. -
-D
: Delete the remote file after successful transfer. The remote file is not deleted if the transfer is ignored, because theFileExistsMode
isIGNORE
and the local file already exists.
The file_remoteDirectory
header provides the remote directory name, and the file_remoteFile
header provides the file name.
The message payload resulting from a get
operation is a File
object that represents the retrieved file or an InputStream
when you use the -stream
option.
The -stream
option allows retrieving the file as a stream.
For text files, a common use case is to combine this operation with a file splitter or a stream transformer.
When consuming remote files as streams, you are responsible for closing the Session
after the stream is consumed.
For convenience, the Session
is provided in the closeableResource
header, which you can access with a convenience method on IntegrationMessageHeaderAccessor
The following example shows how to use the convenience method:
Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}
Framework components such as the file splitter and the stream transformer automatically close the session after the data is transferred.
The following example shows how to consume a file as a stream:
<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />
<int-file:splitter input-channel="stream" output-channel="lines" />
If you consume the input stream in a custom component, you must close the Session .
You can do so either in your custom code or by routing a copy of the message to a service-activator and using SpEL, as the following example shows:
|
<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />
Using the mget
Command
mget
retrieves multiple remote files based on a pattern and supports the following options:
-
-P
: Preserve the timestamps of the remote files. -
-R
: Retrieve the entire directory tree recursively. -
-x
: Throw an exception if no files match the pattern (otherwise an empty list is returned). -
-D
: Delete each remote file after successful transfer. The remote file is not deleted if the transfer is ignored, because theFileExistsMode
isIGNORE
and the local file already exists.
The message payload resulting from an mget
operation is a List<File>
object (that is, a List
of File
objects, each representing a retrieved file).
Starting with version 5.0, if the FileExistsMode is IGNORE , the payload of the output message no longer contains files that were not fetched due to the file already existing.
Previously, the list contained all files, including those that already existed.
|
The expression used to determine the remote path should produce a result that ends with - e.g.
somedir/
will fetch the complete tree under somedir
.
Starting with version 5.0, a recursive mget
, combined with the new FileExistsMode.REPLACE_IF_MODIFIED
mode, can be used to periodically synchronize an entire remote directory tree locally.
This mode replaces the local file’s last modified timestamp with the remote timestamp, regardless of the -P
(preserve timestamp) option.
Using recursion (
-R )The pattern is ignored, and If a subdirectory is filtered, no additional traversal of that subdirectory is performed. The Typically, you would use the |
Starting with version 5.0, the FtpSimplePatternFileListFilter
and FtpRegexPatternFileListFilter
can be configured to always pass directories by setting the alwaysAcceptDirectories
property to true
.
Doing so allows recursion for a simple pattern, as the following examples show:
<bean id="starDotTxtFilter"
class="org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter">
<constructor-arg value="*.txt" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
<bean id="dotStarDotTxtFilter"
class="org.springframework.integration.ftp.filters.FtpRegexPatternFileListFilter">
<constructor-arg value="^.*\.txt$" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
Once you have defined filters such as those in the preceding example, you can use one by setting the filter
property on the gateway.
Using the put
Command
The put
commad sends a file to the remote server.
The payload of the message can be a java.io.File
, a byte[]
, or a String
.
A remote-filename-generator
(or expression) is used to name the remote file.
Other available attributes include remote-directory
, temporary-remote-directory
, and their *-expression
equivalents: use-temporary-file-name
and auto-create-directory
.
See the schema documentation for more information.
The message payload resulting from a put
operation is a String
that represents the full path of the file on the server after transfer.
Version 5.2 introduced the chmod
attribute, which changes the remote file permissions after upload.
You can use the conventional Unix octal format (for example, 600
allows read-write for the file owner only).
When configuring the adapter using java, you can use setChmod(0600)
.
Only applies if your FTP server supports the SITE CHMOD
subcommand.
Using the mput
Command
The mput
sends multiple files to the server and supports only one option:
-
-R
: Recursive. Send all files (possibly filtered) in the directory and its subdirectories.
The message payload must be a java.io.File
(or String
) that represents a local directory.
Since version 5.1, a collection of File
or String
is also supported.
This command supports the same attributes as the put
command.
In addition, files in the local directory can be filtered with one of mput-pattern
, mput-regex
, mput-filter
, or mput-filter-expression
.
The filter works with recursion, as long as the subdirectories themselves pass the filter.
Subdirectories that do not pass the filter are not recursed.
The message payload resulting from an mget
operation is a List<String>
object (that is, a List
of remote file paths that result from the transfer).
Version 5.2 introduced the chmod
attribute, which lets you change the remote file permissions after upload.
You can use the conventional Unix octal format (for example, 600
allows read-write for the file owner only).
When configuring the adapter with Java, you can use setChmodOctal("600")
or setChmod(0600)
.
Only applies if your FTP server supports the SITE CHMOD
subcommand.
Using the rm
Command
The rm
command removes files.
The rm
command has no options.
The message payload resulting from an rm
operation is Boolean.TRUE
if the remove was successful or Boolean.FALSE
otherwise.
The file_remoteDirectory
header provides the remote directory, and the file_remoteFile
header provides the file name.
Using the mv
Command
The mv
command moves files
The mv
command has no options.
The expression
attribute defines the “from” path and the rename-expression
attribute defines the “to” path.
By default, the rename-expression
is headers['file_renameTo']
.
This expression must not evaluate to null or an empty String
.
If necessary, any necessary remote directories are created.
The payload of the result message is Boolean.TRUE
.
The file_remoteDirectory
header provides the original remote directory, and file_remoteFile
header provides the file name.
The new path is in the file_renameTo
header.
Additional Information about FTP Outbound Gateway Commands
The get
and mget
commands support the local-filename-generator-expression
attribute.
It defines a SpEL expression to generate the name of local files during the transfer.
The root object of the evaluation context is the request message.
The remoteFileName
variable, which is particularly useful for mget
, is also available — for example, local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.something"
.
The get
and mget
commands support the local-directory-expression
attribute.
It defines a SpEL expression to generate the name of local directories during the transfer.
The root object of the evaluation context is the request message but.
The remoteDirectory
variable, which is particularly useful for mget
, is also available — for example: local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.something"
.
This attribute is mutually exclusive with the local-directory
attribute.
For all commands, the 'expression' property of the gateway provides the path on which the command acts.
For the mget
command, the expression might evaluate to '', meaning to retrieve all files, or 'somedirectory/', and so on.
The following example shows a gateway configured for an ls
command:
<int-ftp:outbound-gateway id="gateway1"
session-factory="ftpSessionFactory"
request-channel="inbound1"
command="ls"
command-options="-1"
expression="payload"
reply-channel="toSplitter"/>
The payload of the message sent to the toSplitter
channel is a list of String
objects that each contain the name of a file.
If the command-options
attribute was omitted, it holds FileInfo
objects.
It uses space-delimited options — for example, command-options="-1 -dirs -links"
.
Starting with version 4.2, the GET
, MGET
, PUT
and MPUT
commands support a FileExistsMode
property (mode
when using the namespace support).
This affects the behavior when the local file exists (GET
and MGET
) or the remote file exists (PUT
and MPUT
).
Supported modes are REPLACE
, APPEND
, FAIL
, and IGNORE
.
For backwards compatibility, the default mode for PUT
and MPUT
operations is REPLACE
.
For GET
and MGET
operations, the default is FAIL
.
Starting with version 5.0, the setWorkingDirExpression()
(working-dir-expression
in XML) option is provided on the FtpOutboundGateway
(<int-ftp:outbound-gateway>
in XML).
It lets you change the client working directory at runtime.
The expression is evaluated against the request message.
The previous working directory is restored after each gateway operation.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the outbound gateway with Java configuration:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpOutboundGateway ftpOutboundGateway =
new FtpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
ftpOutboundGateway.setOutputChannelName("lsReplyChannel");
return ftpOutboundGateway;
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the outbound gateway with the Java DSL:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpOutboundGatewaySpec ftpOutboundGateway() {
return Ftp.outboundGateway(ftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subFtpSource|.*1.txt)")
.localDirectoryExpression("'localDirectory/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('ftpSource', 'localTarget')");
}
@Bean
public IntegrationFlow ftpMGetFlow(AbstractRemoteFileOutboundGateway<FTPFile> ftpOutboundGateway) {
return f -> f
.handle(ftpOutboundGateway)
.channel(c -> c.queue("remoteFileOutputChannel"));
}
}
Outbound Gateway Partial Success (mget
and mput
)
When you perform operations on multiple files (by using mget
and mput
), an exception can occur some time after one or more files have been transferred.
In this case (starting with version 4.2), a PartialSuccessException
is thrown.
As well as the usual MessagingException
properties (failedMessage
and cause
), this exception has two additional
properties:
-
partialResults
: The successful transfer results. -
derivedInput
: The list of files generated from the request message (for example, local files to transfer for anmput
).
These attributes let you determine which files were successfully transferred and which were not.
In the case of a recursive mput
, the PartialSuccessException
may have nested PartialSuccessException
occurrences.
Consider the following directory structure:
root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt
If the exception occurs on file3.txt
, the PartialSuccessException
thrown by the gateway has derivedInput
of file1.txt
, subdir
, and zoo.txt
and partialResults
of file1.txt
.
Its cause
is another PartialSuccessException
with derivedInput
of file2.txt
and file3.txt
and
partialResults
of file2.txt
.
FTP Session Caching
Starting with Spring Integration 3.0, sessions are no longer cached by default.
The cache-sessions attribute is no longer supported on endpoints.
You must use a CachingSessionFactory (shown in the next example) if you wish to cache sessions.
|
In versions prior to 3.0, the sessions were automatically cached by default.
A cache-sessions
attribute was available for disabling the auto caching, but that solution did not provide a way to configure other session caching attributes.
For example, you could not limit the number of sessions created.
To support that requirement and other configuration options, a CachingSessionFactory
was added.
It provides sessionCacheSize
and sessionWaitTimeout
properties.
The sessionCacheSize
property controls how many active sessions the factory maintains in its cache (the default is unbounded).
If the sessionCacheSize
threshold has been reached, any attempt to acquire another session blocks until either one of the cached sessions becomes available or until the wait time for a session expires (the default wait time is Integer.MAX_VALUE
).
The sessionWaitTimeout
property configures that value.
If you want your sessions to be cached, configure your default session factory as described earlier and then wrap it in an instance of CachingSessionFactory
, where you can provide those additional properties.
The following example shows how to do so:
<bean id="ftpSessionFactory" class="o.s.i.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory" class="o.s.i.file.remote.session.CachingSessionFactory">
<constructor-arg ref="ftpSessionFactory"/>
<constructor-arg value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>
The preceding example shows a CachingSessionFactory
created with the sessionCacheSize
set to 10
and the
sessionWaitTimeout
set to one second (its value is in milliseconds).
Starting with Spring Integration 3.0, the CachingConnectionFactory
provides a resetCache()
method.
When invoked, all idle sessions are immediately closed and in-use sessions are closed when they are returned to the cache.
New requests for sessions establish new sessions as necessary.
Starting with version 5.1, the CachingSessionFactory
has a new property testSession
.
When true, the session will be tested by sending a NOOP command to ensure it is still active; if not, it will be removed from the cache; a new session is created if no active sessions are in the cache.
Using RemoteFileTemplate
Starting with Spring Integration 3.0, a new abstraction is provided over the FtpSession
object.
The template provides methods to send, retrieve (as an InputStream
), remove, and rename files.
In addition an execute
method is provided allowing the caller to execute multiple operations on the session.
In all cases, the template takes care of reliably closing the session.
For more information, see the
Javadoc for RemoteFileTemplate
.
There is a subclass for FTP: FtpRemoteFileTemplate
.
Version 4.1 added added additional methods, including getClientInstance()
, which provides access to the underlying FTPClient
and thus gives you access to low-level APIs.
Not all FTP servers properly implement the STAT <path>
command.
Some return a positive result for a non-existent path.
The NLST
command reliably returns the name when the path is a file and it exists.
However, this does not support checking that an empty directory exists since NLST
always returns an empty list when the path is a directory.
Since the template does not know whether the path represents a directory, it has to perform additional checks when the path does not appear to exist (when using NLST
).
This adds overhead, requiring several requests to the server.
Starting with version 4.1.9, the FtpRemoteFileTemplate
provides the FtpRemoteFileTemplate.ExistsMode
property, which has the following options:
-
STAT
: Perform theSTAT
FTP command (FTPClient.getStatus(path)
) to check the path existence. This is the default and requires that your FTP server properly support theSTAT
command (with a path). -
NLST
: Perform theNLST
FTP command —FTPClient.listName(path)
. Use this if you are testing for a path that is a full path to a file. It does not work for empty directories. -
NLST_AND_DIRS
: Perform theNLST
command first and, if it returns no files, fall back to a technique that temporarily switches the working directory by usingFTPClient.changeWorkingDirectory(path)
. SeeFtpSession.exists()
for more information.
Since we know that the FileExistsMode.FAIL
case is always only looking for a file (and not a directory), we safely use NLST
mode for the FtpMessageHandler
and FtpOutboundGateway
components.
For any other cases, the FtpRemoteFileTemplate
can be extended to implement custom logic in the overridden exist()
method.
Starting with version 5.0, the new RemoteFileOperations.invoke(OperationsCallback<F, T> action)
method is available.
This method lets several RemoteFileOperations
calls be called in the scope of the same, thread-bounded, Session
.
This is useful when you need to perform several high-level operations of the RemoteFileTemplate
as one unit of work.
For example, AbstractRemoteFileOutboundGateway
uses it with the mput
command implementation, where we perform a put
operation for each file in the provided directory and recursively for its sub-directories.
See the Javadoc for more information.
Using MessageSessionCallback
Starting with Spring Integration 4.2, you can use a MessageSessionCallback<F, T>
implementation with the
<int-ftp:outbound-gateway/>
(FtpOutboundGateway
in Java) to perform any operations on the Session<FTPFile>
with
the requestMessage
context.
It can be used for any non-standard or low-level FTP operations and allows access from an integration flow definition and functional interface (Lambda) implementation injection, as the following example shows:
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler ftpOutboundGateway(SessionFactory<FTPFile> sessionFactory) {
return new FtpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}
Another example might be to pre- or post-process the file data being sent or retrieved.
When using XML configuration, the <int-ftp:outbound-gateway/>
provides a session-callback
attribute to let you specify the MessageSessionCallback
bean name.
The session-callback is mutually exclusive with the command and expression attributes.
When configuring with Java, different constructors are available in the FtpOutboundGateway class.
|
Apache Mina FTP Server Events
The ApacheMinaFtplet
, added in version 5.2, listens for certain Apache Mina FTP server events and publishes them as ApplicationEvent
s which can be received by any ApplicationListener
bean, @EventListener
bean method, or Event Inbound Channel Adapter.
Currently supported events are:
-
SessionOpenedEvent
- a client session was opened -
DirectoryCreatedEvent
- a directory was created -
FileWrittenEvent
- a file was written to -
PathMovedEvent
- a file or directory was renamed -
PathRemovedEvent
- a file or directory was removed -
SessionClosedEvent
- the client has disconnected
Each of these is a subclass of ApacheMinaFtpEvent
; you can configure a single listener to receive all of the event types.
The source
property of each event is a FtpSession
, from which you can obtain information such as the client address; a convenient getSession()
method is provided on the abstract event.
Events other than session open/close have another property FtpRequest
which has properties such as the command and arguments.
To configure the server with the listener (which must be a Spring bean), add it to the server factory:
FtpServerFactory serverFactory = new FtpServerFactory();
...
ListenerFactory factory = new ListenerFactory();
...
serverFactory.addListener("default", factory.createListener());
serverFactory.setFtplets(new HashMap<>(Collections.singletonMap("springFtplet", apacheMinaFtpletBean)));
server = serverFactory.createServer();
server.start();
To consume these events using a Spring Integration event adapter:
@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaFtpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}
Remote File Information
Starting with version 5.2, the FtpStreamingMessageSource
(FTP Streaming Inbound Channel Adapter), FtpInboundFileSynchronizingMessageSource
(FTP Inbound Channel Adapter) and "read"-commands of the FtpOutboundGateway
(FTP Outbound Gateway) provide additional headers in the message to produce with an information about the remote file:
-
FileHeaders.REMOTE_HOST_PORT
- the host:port pair the remote session has been connected to during file transfer operation; -
FileHeaders.REMOTE_DIRECTORY
- the remote directory the operation has been performed; -
FileHeaders.REMOTE_FILE
- the remote file name; applicable only for single file operations.
Since the FtpInboundFileSynchronizingMessageSource
doesn’t produce messages against remote files, but using a local copy, the AbstractInboundFileSynchronizer
stores an information about remote file in the MetadataStore
(which can be configured externally) in the URI style (protocol://host:port/remoteDirectory#remoteFileName
) during synchronization operation.
This metadata is retrieved by the FtpInboundFileSynchronizingMessageSource
when local file is polled.
When local file is deleted, it is recommended to remove its metadata entry.
The AbstractInboundFileSynchronizer
provides a removeRemoteFileMetadata()
callback for this purpose.
In addition there is a setMetadataStorePrefix()
to be used in the metadata keys.
It is recommended to have this prefix be different from the one used in the MetadataStore
-based FileListFilter
implementations, when the same MetadataStore
instance is shared between these components, to avoid entry overriding because both filter and AbstractInboundFileSynchronizer
use the same local file name for the metadata entry key.