File Support
Spring Integration’s file support extends the Spring Integration core with a dedicated vocabulary to deal with reading, writing, and transforming files.
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>5.5.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-file:5.5.5"
It provides a namespace that enables elements defining channel adapters dedicated to files and support for transformers that can read file contents into strings or byte arrays.
This section explains the workings of FileReadingMessageSource
and FileWritingMessageHandler
and how to configure them as beans.
It also discusses the support for dealing with files through file-specific implementations of Transformer
.
Finally, it explains the file-specific namespace.
Reading Files
A FileReadingMessageSource
can be used to consume files from the filesystem.
This is an implementation of MessageSource
that creates messages from a file system directory.
The following example shows how to configure a FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
To prevent creating messages for certain files, you can supply a FileListFilter
.
By default, we use the following filters:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
The IgnoreHiddenFileListFilter
ensures that hidden files are not processed.
Note that the exact definition of hidden is system-dependent.
For example, on UNIX-based systems, a file beginning with a period character is considered to be hidden.
Microsoft Windows, on the other hand, has a dedicated file attribute to indicate hidden files.
Version 4.2 introduced the |
The AcceptOnceFileListFilter
ensures files are picked up only once from the directory.
The Since version 4.0, this filter requires a Since version 4.1.5, this filter has a new property ( |
The persistent file list filters now have a boolean property forRecursion
.
Setting this property to true
, also sets alwaysAcceptDirectories
, which means that the recursive operation on the outbound gateways (ls
and mget
) will now always traverse the full directory tree each time.
This is to solve a problem where changes deep in the directory tree were not detected.
In addition, forRecursion=true
causes the full path to files to be used as the metadata store keys; this solves a problem where the filter did not work properly if a file with the same name appears multiple times in different directories.
IMPORTANT: This means that existing keys in a persistent metadata store will not be found for files beneath the top level directory.
For this reason, the property is false
by default; this may change in a future release.
The following example configures a FileReadingMessageSource
with a filter:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
A common problem with reading files is that a file may be detected before it is ready (that is, some other process may still be writing the file).
The default AcceptOnceFileListFilter
does not prevent this.
In most cases, this can be prevented if the file-writing process renames each file as soon as it is ready for reading.
A filename-pattern
or filename-regex
filter that accepts only files that are ready (perhaps based on a known suffix), composed with the default AcceptOnceFileListFilter
, allows for this situation.
The CompositeFileListFilter
enables the composition, as the following example shows:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
If it is not possible to create the file with a temporary name and rename to the final name, Spring Integration provides another alternative.
Version 4.2 added the LastModifiedFileListFilter
.
This filter can be configured with an age
property so that only files older than this value are passed by the filter.
The age defaults to 60 seconds, but you should choose an age that is large enough to avoid picking up a file early (due to, say, network glitches).
The following example shows how to configure a LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
Starting with version 4.3.7, a ChainFileListFilter
(an extension of CompositeFileListFilter
) has been introduced to allow scenarios when subsequent filters should only see the result of the previous filter.
(With the CompositeFileListFilter
, all filters see all the files, but it passes only files that have passed all filters).
An example of where the new behavior is required is a combination of LastModifiedFileListFilter
and AcceptOnceFileListFilter
, when we do not wish to accept the file until some amount of time has elapsed.
With the CompositeFileListFilter
, since the AcceptOnceFileListFilter
sees all the files on the first pass, it does not pass it later when the other filter does.
The CompositeFileListFilter
approach is useful when a pattern filter is combined with a custom filter that looks for a secondary file to indicate that file transfer is complete.
The pattern filter might only pass the primary file (such as something.txt
) but the “done” filter needs to see whether (for example) something.done
is present.
Say we have files a.txt
, a.done
, and b.txt
.
The pattern filter passes only a.txt
and b.txt
, while the “done” filter sees all three files and passes only a.txt
.
The final result of the composite filter is that only a.txt
is released.
With the ChainFileListFilter , if any filter in the chain returns an empty list, the remaining filters are not invoked.
|
Version 5.0 introduced an ExpressionFileListFilter
to execute SpEL expression against a file as a context evaluation root object.
For this purpose, all the XML components for file handling (local and remote), along with an existing filter
attribute, have been supplied with the filter-expression
option, as the following example shows:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
Version 5.0.5 introduced the DiscardAwareFileListFilter
implementations that have an interest in rejected files.
For this purpose, such a filter implementation should be supplied with a callback through addDiscardCallback(Consumer<File>)
.
In the framework, this functionality is used from the FileReadingMessageSource.WatchServiceDirectoryScanner
, in combination with LastModifiedFileListFilter
.
Unlike the regular DirectoryScanner
, the WatchService
provides files for processing according to the events on the target file system.
At the moment of polling an internal queue with those files, the LastModifiedFileListFilter
may discard them because they are too young relative to its configured age
.
Therefore, we lose the file for future possible considerations.
The discard callback hook lets us retain the file in the internal queue so that it is available to be checked against the age
in subsequent polls.
The CompositeFileListFilter
also implements a DiscardAwareFileListFilter
and populates a discard callback to all its DiscardAwareFileListFilter
delegates.
Since CompositeFileListFilter matches the files against all delegates, the discardCallback may be called several times for the same file.
|
Starting with version 5.1, the FileReadingMessageSource
doesn’t check a directory for existence and doesn’t create it until its start()
is called (typically via wrapping SourcePollingChannelAdapter
).
Previously, there was no simple way to prevent an operation system permissions error when referencing the directory, for example from tests, or when permissions are applied later.
Message Headers
Starting with version 5.0, the FileReadingMessageSource
(in addition to the payload
as a polled File
) populates the following headers to the outbound Message
:
-
FileHeaders.FILENAME
: TheFile.getName()
of the file to send. Can be used for subsequent rename or copy logic. -
FileHeaders.ORIGINAL_FILE
: TheFile
object itself. Typically, this header is populated automatically by framework components (such as splitters or transformers) when we lose the originalFile
object. However, for consistency and convenience with any other custom use cases, this header can be useful to get access to the original file. -
FileHeaders.RELATIVE_PATH
: A new header introduced to represent the part of file path relative to the root directory for the scan. This header can be useful when the requirement is to restore a source directory hierarchy in the other places. For this purpose, theDefaultFileNameGenerator
(see "`Generating File Names) can be configured to use this header.
Directory Scanning and Polling
The FileReadingMessageSource
does not produce messages for files from the directory immediately.
It uses an internal queue for 'eligible files' returned by the scanner
.
The scanEachPoll
option is used to ensure that the internal queue is refreshed with the latest input directory content on each poll.
By default (scanEachPoll = false
), the FileReadingMessageSource
empties its queue before scanning the directory again.
This default behavior is particularly useful to reduce scans of large numbers of files in a directory.
However, in cases where custom ordering is required, it is important to consider the effects of setting this flag to true
.
The order in which files are processed may not be as expected.
By default, files in the queue are processed in their natural (path
) order.
New files added by a scan, even when the queue already has files, are inserted in the appropriate position to maintain that natural order.
To customize the order, the FileReadingMessageSource
can accept a Comparator<File>
as a constructor argument.
It is used by the internal (PriorityBlockingQueue
) to reorder its content according to the business requirements.
Therefore, to process files in a specific order, you should provide a comparator to the FileReadingMessageSource
rather than ordering the list produced by a custom DirectoryScanner
.
Version 5.0 introduced RecursiveDirectoryScanner
to perform file tree visiting.
The implementation is based on the Files.walk(Path start, int maxDepth, FileVisitOption… options)
functionality.
The root directory (DirectoryScanner.listFiles(File)
) argument is excluded from the result.
All other sub-directories inclusions and exclusions are based on the target FileListFilter
implementation.
For example, the SimplePatternFileListFilter
filters out directories by default.
See AbstractDirectoryAwareFileListFilter
and its implementations for more information.
Starting with version 5.5, the FileInboundChannelAdapterSpec of the Java DSL has a convenient recursive(boolean) option to use a RecursiveDirectoryScanner in the target FileReadingMessageSource instead of the default one.
|
Namespace Support
The configuration for file reading can be simplified by using the file-specific namespace. To do so, use the following template:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
Within this namespace, you can reduce the FileReadingMessageSource
and wrap it in an inbound Channel Adapter, as follows:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
The first channel adapter example relies on the default FileListFilter
implementations:
-
IgnoreHiddenFileListFilter
(do not process hidden files) -
AcceptOnceFileListFilter
(prevent duplication)
Therefore, you can also leave off the prevent-duplicates
and ignore-hidden
attributes, as they are true
by default.
Spring Integration 4.2 introduced the |
The second channel adapter example uses a custom filter, the third uses the filename-pattern
attribute to add an AntPathMatcher
based filter, and the fourth uses the filename-regex
attribute to add a regular expression pattern-based filter to the FileReadingMessageSource
.
The filename-pattern
and filename-regex
attributes are each mutually exclusive with the regular filter
reference attribute.
However, you can use the filter
attribute to reference an instance of CompositeFileListFilter
that combines any number of filters, including one or more pattern-based filters to fit your particular needs.
When multiple processes read from the same directory, you may want to lock files to prevent them from being picked up concurrently.
To do so, you can use a FileLocker
.
There is a java.nio
-based implementation available, but it is also possible to implement your own locking scheme.
The nio
locker can be injected as follows:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
You can configure a custom locker as follows:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
When a file inbound adapter is configured with a locker, it takes responsibility for acquiring a lock before the file is allowed to be received.
It does not assume the responsibility to unlock the file.
If you have processed the file and keep the locks hanging around, you have a memory leak.
If this is a problem, you should call FileLocker.unlock(File file) yourself at the appropriate time.
|
When filtering and locking files is not enough, you might need to control the way files are listed entirely.
To implement this type of requirement, you can use an implementation of DirectoryScanner
.
This scanner lets you determine exactly what files are listed in each poll.
This is also the interface that Spring Integration uses internally to wire FileListFilter
instances and FileLocker
to the FileReadingMessageSource
.
You can inject a custom DirectoryScanner
into the <int-file:inbound-channel-adapter/>
on the scanner
attribute, as the following example shows:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
Doing so gives you full freedom to choose the ordering, listing, and locking strategies.
It is also important to understand that filters (including patterns
, regex
, prevent-duplicates
, and others) and locker
instances are actually used by the scanner
.
Any of these attributes set on the adapter are subsequently injected into the internal scanner
.
For the case of an external scanner
, all filter and locker attributes are prohibited on the FileReadingMessageSource
.
They must be specified (if required) on that custom DirectoryScanner
.
In other words, if you inject a scanner
into the FileReadingMessageSource
, you should supply filter
and locker
on that scanner
, not on the FileReadingMessageSource
.
By default, the DefaultDirectoryScanner uses an IgnoreHiddenFileListFilter and an AcceptOnceFileListFilter .
To prevent their use, you can configure your own filter (such as AcceptAllFileListFilter ) or even set it to null .
|
WatchServiceDirectoryScanner
The FileReadingMessageSource.WatchServiceDirectoryScanner
relies on file-system events when new files are added to the directory.
During initialization, the directory is registered to generate events.
The initial file list is also built during initialization.
While walking the directory tree, any subdirectories encountered are also registered to generate events.
On the first poll, the initial file list from walking the directory is returned.
On subsequent polls, files from new creation events are returned.
If a new subdirectory is added, its creation event is used to walk the new subtree to find existing files and register any new subdirectories found.
There is an issue with WatchKey when its internal events queue is not drained by the program as quickly as the directory modification events occur.
If the queue size is exceeded, a StandardWatchEventKinds.OVERFLOW is emitted to indicate that some file system events may be lost.
In this case, the root directory is re-scanned completely.
To avoid duplicates, consider using an appropriate FileListFilter (such as the AcceptOnceFileListFilter ) or removing files when processing is complete.
|
The WatchServiceDirectoryScanner
can be enabled through the FileReadingMessageSource.use-watch-service
option, which is mutually exclusive with the scanner
option.
An internal FileReadingMessageSource.WatchServiceDirectoryScanner
instance is populated for the provided directory
.
In addition, now the WatchService
polling logic can track the StandardWatchEventKinds.ENTRY_MODIFY
and StandardWatchEventKinds.ENTRY_DELETE
.
If you need to track the modification of existing files as well as new files, you should implement the ENTRY_MODIFY
events logic in the FileListFilter
.
Otherwise, the files from those events are treated the same way.
The ResettableFileListFilter
implementations pick up the ENTRY_DELETE
events.
Consequently, their files are provided for the remove()
operation.
When this event is enabled, filters such as the AcceptOnceFileListFilter
have the file removed.
As a result, if a file with the same name appears, it passes the filter and is sent as a message.
For this purpose, the watch-events
property (FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
) has been introduced.
(WatchEventType
is a public inner enumeration in FileReadingMessageSource
.)
With such an option, we can use one downstream flow logic for new files and use some other logic for modified files.
The following example shows how to configure different logic for create and modify events in the same directory:
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
Limiting Memory Consumption
You can use a HeadDirectoryScanner
to limit the number of files retained in memory.
This can be useful when scanning large directories.
With XML configuration, this is enabled by setting the queue-size
property on the inbound channel adapter.
Prior to version 4.2, this setting was incompatible with the use of any other filters.
Any other filters (including prevent-duplicates="true"
) overwrote the filter used to limit the size.
The use of a Generally, instead of using an |
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the outbound adapter with Java configuration:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
'tail’ing Files
Another popular use case is to get 'lines' from the end (or tail) of a file, capturing new lines when they are added.
Two implementations are provided.
The first, OSDelegatingFileTailingMessageProducer
, uses the native tail
command (on operating systems that have one).
This is generally the most efficient implementation on those platforms.
For operating systems that do not have a tail
command, the second implementation, ApacheCommonsFileTailingMessageProducer
, uses the Apache commons-io
Tailer
class.
In both cases, file system events, such as files being unavailable and other events, are published as ApplicationEvent
instances by using the normal Spring event publishing mechanism.
Examples of such events include the following:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
The sequence of events shown in the preceding example might occur, for example, when a file is rotated.
Starting with version 5.0, a FileTailingIdleEvent
is emitted when there is no data in the file during idleEventInterval
.
The following example shows what such an event looks like:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
Not all platforms that support a tail command provide these status messages.
|
Messages emitted from these endpoints have the following headers:
-
FileHeaders.ORIGINAL_FILE
: TheFile
object -
FileHeaders.FILENAME
: The file name (File.getName()
)
In versions prior to version 5.0, the FileHeaders.FILENAME header contained a string representation of the file’s absolute path.
You can now obtain that string representation by calling getAbsolutePath() on the original file header.
|
The following example creates a native adapter with the default options ('-F -n 0', meaning to follow the file name from the current end).
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
The following example creates a native adapter with '-F -n +0' options (meaning follow the file name, emitting all existing lines).
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
If the tail
command fails (on some platforms, a missing file causes the tail
to fail, even with -F
specified), the command is retried every 10 seconds.
By default, native adapters capture from standard output and send the content as messages.
They also capture from standard error to raise events.
Starting with version 4.3.6, you can discard the standard error events by setting the enable-status-reader
to false
, as the following example shows:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
In the following example, IdleEventInterval
is set to 5000
, meaning that, if no lines are written for five seconds, FileTailingIdleEvent
is triggered every five seconds:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
This can be useful when you need to stop the adapter.
The following example creates an Apache commons-io
Tailer
adapter that examines the file for new lines every two seconds and checks for existence of a missing file every ten seconds:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | The file is tailed from the beginning (end="false" ) instead of the end (which is the default). |
2 | The file is reopened for each chunk (the default is to keep the file open). |
Specifying the delay , end or reopen attributes forces the use of the Apache commons-io adapter and makes the native-options attribute unavailable.
|
Dealing With Incomplete Data
A common problem in file-transfer scenarios is how to determine that the transfer is complete so that you do not start reading an incomplete file.
A common technique to solve this problem is to write the file with a temporary name and then atomically rename it to the final name.
This technique, together with a filter that masks the temporary file from being picked up by the consumer, provides a robust solution.
This technique is used by Spring Integration components that write files (locally or remotely).
By default, they append .writing
to the file name and remove it when the transfer is complete.
Another common technique is to write a second “marker” file to indicate that the file transfer is complete.
In this scenario, you should not consider somefile.txt
(for example) to be available for use until somefile.txt.complete
is also present.
Spring Integration version 5.0 introduced new filters to support this mechanism.
Implementations are provided for the file system (FileSystemMarkerFilePresentFileListFilter
), FTP and SFTP.
They are configurable such that the marker file can have any name, although it is usually related to the file being transferred.
See the Javadoc for more information.
Writing files
To write messages to the file system, you can use a FileWritingMessageHandler
.
This class can deal with the following payload types:
-
File
-
String
-
byte array
-
InputStream
(since version 4.2)
For a String payload, you can configure the encoding and the charset.
To make things easier, you can configure the FileWritingMessageHandler
as part of an outbound channel adapter or outbound gateway by using the XML namespace.
Starting with version 4.3, you can specify the buffer size to use when writing files.
Starting with version 5.1, you can provide a BiConsumer<File, Message<?>>
newFileCallback
which is triggered if you use FileExistsMode.APPEND
or FileExistsMode.APPEND_NO_FLUSH
and a new file has to be created.
This callback receives a newly created file and the message which triggered it.
This callback could be used to write a CSV header defined in the message header, for an example.
Generating File Names
In its simplest form, the FileWritingMessageHandler
requires only a destination directory for writing the files.
The name of the file to be written is determined by the handler’s FileNameGenerator
.
The default implementation looks for a message header whose key matches the constant defined as FileHeaders.FILENAME
.
Alternatively, you can specify an expression to be evaluated against the message to generate a file name — for example, headers['myCustomHeader'] + '.something'
.
The expression must evaluate to a String
.
For convenience, the DefaultFileNameGenerator
also provides the setHeaderName
method, letting you explicitly specify the message header whose value is to be used as the filename.
Once set up, the DefaultFileNameGenerator
employs the following resolution steps to determine the filename for a given message payload:
-
Evaluate the expression against the message and, if the result is a non-empty
String
, use it as the filename. -
Otherwise, if the payload is a
java.io.File
, use theFile
object’s filename. -
Otherwise, use the message ID appended with .
msg
as the filename.
When you use the XML namespace support, both the file outbound channel adapter and the file outbound gateway support the following mutually exclusive configuration attributes:
-
filename-generator
(a reference to aFileNameGenerator
implementation) -
filename-generator-expression
(an expression that evaluates to aString
)
While writing files, a temporary file suffix is used (its default is .writing
).
It is appended to the filename while the file is being written.
To customize the suffix, you can set the temporary-file-suffix
attribute on both the file outbound channel adapter and the file outbound gateway.
When using the APPEND file mode , the temporary-file-suffix attribute is ignored, since the data is appended to the file directly.
|
Starting with ,version 4.2.5, the generated file name (as a result of filename-generator
or filename-generator-expression
evaluation) can represent a child path together with the target file name.
It is used as a second constructor argument for File(File parent, String child)
as before.
However, in the past we did not create (mkdirs()
) directories for the child path, assuming only the file name.
This approach is useful for cases when we need to restore the file system tree to match the source directory — for example, when unzipping the archive and saving all the files in the target directory in the original order.
Specifying the Output Directory
Both, the file outbound channel adapter and the file outbound gateway provide two mutually exclusive configuration attributes for specifying the output directory:
-
directory
-
directory-expression
Spring Integration 2.2 introduced the directory-expression attribute.
|
Using the directory
Attribute
When you use the directory
attribute, the output directory is set to a fixed value, which is set when the FileWritingMessageHandler
is initialized.
If you do not specify this attribute, you must use the directory-expression
attribute.
Using the directory-expression
Attribute
If you want to have full SpEL support, you can use the directory-expression
attribute.
This attribute accepts a SpEL expression that is evaluated for each message being processed.
Thus, you have full access to a message’s payload and its headers when you dynamically specify the output file directory.
The SpEL expression must resolve to either a String
, java.io.File
or org.springframework.core.io.Resource
.
(The later is evaluated into a File
anyway.)
Furthermore, the resulting String
or File
must point to a directory.
If you do not specify the directory-expression
attribute, then you must set the directory
attribute.
Using the auto-create-directory
Attribute
By default, if the destination directory does not exist, the respective destination directory and any non-existing parent directories are automatically created.
To prevent that behavior, you can set the auto-create-directory
attribute to false
.
This attribute applies to both the directory
and the directory-expression
attributes.
When using the Instead of checking for the existence of the destination directory when the adapter is initialized, this check is now performed for each message being processed. Furthermore, if |
Dealing with Existing Destination Files
When you write files and the destination file already exists, the default behavior is to overwrite that target file.
You can change this behavior by setting the mode
attribute on the relevant file outbound components.
The following options exist:
-
REPLACE
(Default) -
REPLACE_IF_MODIFIED
-
APPEND
-
APPEND_NO_FLUSH
-
FAIL
-
IGNORE
Spring Integration 2.2 introduced the mode attribute and the APPEND , FAIL , and IGNORE options.
|
REPLACE
-
If the target file already exists, it is overwritten. If the
mode
attribute is not specified, this is the default behavior when writing files. REPLACE_IF_MODIFIED
-
If the target file already exists, it is overwritten only if the last modified timestamp differs from that of the source file. For
File
payloads, the payloadlastModified
time is compared to the existing file. For other payloads, theFileHeaders.SET_MODIFIED
(file_setModified
) header is compared to the existing file. If the header is missing or has a value that is not aNumber
, the file is always replaced. APPEND
-
This mode lets you append message content to the existing file instead of creating a new file each time. Note that this attribute is mutually exclusive with the
temporary-file-suffix
attribute because, when it appends content to the existing file, the adapter no longer uses a temporary file. The file is closed after each message. APPEND_NO_FLUSH
-
This option has the same semantics as
APPEND
, but the data is not flushed and the file is not closed after each message. This can provide a significant performance at the risk of data loss in the event of a failure. See Flushing Files When UsingAPPEND_NO_FLUSH
for more information. FAIL
-
If the target file exists, a
MessageHandlingException
is thrown. IGNORE
-
If the target file exists, the message payload is silently ignored.
When using a temporary file suffix (the default is .writing ), the IGNORE option applies if either the final file name or the temporary file name exists.
|
Flushing Files When Using APPEND_NO_FLUSH
The APPEND_NO_FLUSH
mode was added in version 4.3.
Using it can improve performance because the file is not closed after each message.
However, this can cause data loss in the event of a failure.
Spring Integration provides several flushing strategies to mitigate this data loss:
-
Use
flushInterval
. If a file is not written to for this period of time, it is automatically flushed. This is approximate and may be up to1.33x
this time (with an average of1.167x
). -
Send a message containing a regular expression to the message handler’s
trigger
method. Files with absolute path names matching the pattern are flushed. -
Provide the handler with a custom
MessageFlushPredicate
implementation to modify the action taken when a message is sent to thetrigger
method. -
Invoke one of the handler’s
flushIfNeeded
methods by passing in a customFileWritingMessageHandler.FlushPredicate
orFileWritingMessageHandler.MessageFlushPredicate
implementation.
The predicates are called for each open file. See the Javadoc for these interfaces for more information. Note that, since version 5.0, the predicate methods provide another parameter: the time that the current file was first written to if new or previously closed.
When using flushInterval
, the interval starts at the last write.
The file is flushed only if it is idle for the interval.
Starting with version 4.3.7, an additional property (flushWhenIdle
) can be set to false
, meaning that the interval starts with the first write to a previously flushed (or new) file.
File Timestamps
By default, the destination file’s lastModified
timestamp is the time when the file was created (except that an in-place rename retains the current timestamp).
Starting with version 4.3, you can now configure preserve-timestamp
(or setPreserveTimestamp(true)
when using Java configuration).
For File
payloads, this transfers the timestamp from the inbound file to the outbound (regardless of whether a copy was required).
For other payloads, if the FileHeaders.SET_MODIFIED
header (file_setModified
) is present, it is used to set the destination file’s lastModified
timestamp, as long as the header is a Number
.
File Permissions
Starting with version 5.0, when writing files to a file system that supports Posix permissions, you can specify those permissions on the outbound channel adapter or gateway.
The property is an integer and is usually supplied in the familiar octal format — for example, 0640
, meaning that the owner has read/write permissions, the group has read-only permission, and others have no access.
File Outbound Channel Adapter
The following example configures a file outbound channel adapter:
<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>
The namespace-based configuration also supports a delete-source-files
attribute.
If set to true
, it triggers the deletion of the original source files after writing to a destination.
The default value for that flag is false
.
The following example shows how to set it to true
:
<int-file:outbound-channel-adapter id="filesOut"
directory="${output.directory}"
delete-source-files="true"/>
The delete-source-files attribute has an effect only if the inbound message has a File payload or if the FileHeaders.ORIGINAL_FILE header value contains either the source File instance or a String representing the original file path.
|
Starting with version 4.2, the FileWritingMessageHandler
supports an append-new-line
option.
If set to true
, a new line is appended to the file after a message is written.
The default attribute value is false
.
The following example shows how to use the append-new-line
option:
<int-file:outbound-channel-adapter id="newlineAdapter"
append-new-line="true"
directory="${output.directory}"/>
Outbound Gateway
In cases where you want to continue processing messages based on the written file, you can use the outbound-gateway
instead.
It plays a role similar to that of the outbound-channel-adapter
.
However, after writing the file, it also sends it to the reply channel as the payload of a message.
The following example configures an outbound gateway:
<int-file:outbound-gateway id="mover" request-channel="moveInput"
reply-channel="output"
directory="${output.directory}"
mode="REPLACE" delete-source-files="true"/>
As mentioned earlier, you can also specify the mode
attribute, which defines the behavior of how to deal with situations where the destination file already exists.
See Dealing with Existing Destination Files for further details.
Generally, when using the file outbound gateway, the result file is returned as the message payload on the reply channel.
This also applies when specifying the IGNORE
mode.
In that case the pre-existing destination file is returned.
If the payload of the request message was a file, you still have access to that original file through the message header.
See FileHeaders.ORIGINAL_FILE.
The 'outbound-gateway' works well in cases where you want to first move a file and then send it through a processing pipeline.
In such cases, you may connect the file namespace’s inbound-channel-adapter element to the outbound-gateway and then connect that gateway’s reply-channel to the beginning of the pipeline.
|
If you have more elaborate requirements or need to support additional payload types as input to be converted to file content, you can extend the FileWritingMessageHandler
, but a much better option is to rely on a Transformer
.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:
@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
}
@Bean
@ServiceActivator(inputChannel = "writeToFileChannel")
public MessageHandler fileWritingMessageHandler() {
Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
handler.setFileExistsMode(FileExistsMode.APPEND);
return handler;
}
@MessagingGateway(defaultRequestChannel = "writeToFileChannel")
public interface MyGateway {
void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
@Header(FileHeaders.FILENAME) File directory, String data);
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the inbound adapter with the Java DSL:
@SpringBootApplication
public class FileWritingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
fileWritingInput.send(new GenericMessage<>("foo"));
}
@Bean
public IntegrationFlow fileWritingFlow() {
return IntegrationFlows.from("fileWritingInput")
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
.header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
.handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
.channel(MessageChannels.queue("fileWritingResultChannel"))
.get();
}
}
File Transformers
To transform data read from the file system to objects and the other way around, you need to do some work.
Unlike FileReadingMessageSource
and to a lesser extent FileWritingMessageHandler
, you probably need your own mechanism to get the job done.
For this, you can implement the Transformer
interface.
Alternatively, you can extend the AbstractFilePayloadTransformer
for inbound messages.
Spring Integration provides some obvious implementations.
See the Javadoc for the Transformer
interface to see which Spring Integration classes implement it.
Similarly, you can check the Javadoc for the AbstractFilePayloadTransformer
class to see which Spring Integration classes extend it.
FileToByteArrayTransformer
extends AbstractFilePayloadTransformer
and transforms a File
object into a byte[]
by using Spring’s FileCopyUtils
.
It is often better to use a sequence of transformers than to put all transformations in a single class.
In that case the File
to byte[]
conversion might be a logical first step.
FileToStringTransformer
extends AbstractFilePayloadTransformer
convert a File
object to a String
.
If nothing else, this can be useful for debugging (consider using it with a wire tap).
To configure file-specific transformers, you can use the appropriate elements from the file namespace, as the following example shows:
<int-file:file-to-bytes-transformer input-channel="input" output-channel="output"
delete-files="true"/>
<int-file:file-to-string-transformer input-channel="input" output-channel="output"
delete-files="true" charset="UTF-8"/>
The delete-files
option signals to the transformer that it should delete the inbound file after the transformation is complete.
This is in no way a replacement for using an AcceptOnceFileListFilter
when the FileReadingMessageSource
is being used in a multi-threaded environment (such as when you use Spring Integration in general).
File Splitter
The FileSplitter
was added in version 4.1.2, and its namespace support was added in version 4.2.
The FileSplitter
splits text files into individual lines, based on BufferedReader.readLine()
.
By default, the splitter uses an Iterator
to emit lines one at a time as they are read from the file.
Setting the iterator
property to false
causes it to read all the lines into memory before emitting them as messages.
One use case for this might be if you want to detect I/O errors on the file before sending any messages containing lines.
However, it is only practical for relatively short files.
Inbound payloads can be File
, String
(a File
path), InputStream
, or Reader
.
Other payload types are emitted unchanged.
The following listing shows possible ways to configure a FileSplitter
:
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | The bean name of the splitter. |
2 | Set to true (the default) to use an iterator or false to load the file into memory before sending lines. |
3 | Set to true to emit start-of-file and end-of-file marker messages before and after the file data.
Markers are messages with FileSplitter.FileMarker payloads (with START and END values in the mark property).
You might use markers when sequentially processing files in a downstream flow where some lines are filtered.
They enable the downstream processing to know when a file has been completely processed.
In addition, a file_marker header that contains START or END is added to these messages.
The END marker includes a line count.
If the file is empty, only START and END markers are emitted with 0 as the lineCount .
The default is false .
When true , apply-sequence is false by default.
See also markers-json (the next attribute). |
4 | When markers is true, set this to true to have the FileMarker objects be converted to a JSON string.
(Uses a SimpleJsonSerializer underneath). |
5 | Set to false to disable the inclusion of sequenceSize and sequenceNumber headers in messages.
The default is true , unless markers is true .
When true and markers is true , the markers are included in the sequencing.
When true and iterator is true , the sequenceSize header is set to 0 , because the size is unknown. |
6 | Set to true to cause a RequiresReplyException to be thrown if there are no lines in the file.
The default is false . |
7 | Set the charset name to be used when reading text data into String payloads.
The default is the platform charset. |
8 | The header name for the first line to be carried as a header in the messages emitted for the remaining lines. Since version 5.0. |
9 | Set the input channel used to send messages to the splitter. |
10 | Set the output channel to which messages are sent. |
11 | Set the send timeout.
Only applies if the output-channel can block — such as a full QueueChannel . |
12 | Set to false to disable automatically starting the splitter when the context is refreshed.
The default is true . |
13 | Set the order of this endpoint if the input-channel is a <publish-subscribe-channel/> . |
14 | Set the startup phase for the splitter (used when auto-startup is true ). |
The FileSplitter
also splits any text-based InputStream
into lines.
Starting with version 4.3, when used in conjunction with an FTP or SFTP streaming inbound channel adapter or an FTP or SFTP outbound gateway that uses the stream
option to retrieve a file, the splitter automatically closes the session that supports the stream when the file is completely consumed
See FTP Streaming Inbound Channel Adapter and SFTP Streaming Inbound Channel Adapter as well as FTP Outbound Gateway and SFTP Outbound Gateway for more information about these facilities.
When using Java configuration, an additional constructor is available, as the following example shows:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
When markersJson
is true, the markers are represented as a JSON string (using a SimpleJsonSerializer
).
Version 5.0 introduced the firstLineAsHeader
option to specify that the first line of content is a header (such as column names in a CSV file).
The argument passed to this property is the header name under which the first line is carried as a header in the messages emitted for the remaining lines.
This line is not included in the sequence header (if applySequence
is true) nor in the lineCount
associated with FileMarker.END
.
NOTE: Starting with version 5.5, the lineCount` is also included as a FileHeaders.LINE_COUNT
into headers of the FileMarker.END
message, since the FileMarker
could be serialized into JSON.
If a file contains only the header line, the file is treated as empty and, therefore, only FileMarker
instances are emitted during splitting (if markers are enabled — otherwise, no messages are emitted).
By default (if no header name is set), the first line is considered to be data and becomes the payload of the first emitted message.
If you need more complex logic about header extraction from the file content (not first line, not the whole content of the line, not one particular header, and so on), consider using header enricher ahead of the FileSplitter
.
Note that the lines that have been moved to the headers might be filtered downstream from the normal content process.
Idempotent Downstream Processing a Split File
When apply-sequence
is true, the splitter adds the line number in the SEQUENCE_NUMBER
header (when markers
is true, the markers are counted as lines).
The line number can be used with an Idempotent Receiver to avoid reprocessing lines after a restart.
For example:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}
File Aggregator
Starting with version 5.5, a FileAggregator
is introduced to cover other side of FileSplitter
use-case when START/END markers are enabled.
For convenience the FileAggregator
implements all three sequence details strategies:
-
The
HeaderAttributeCorrelationStrategy
with theFileHeaders.FILENAME
attribute is used for correlation key calculation. When markers are enabled on theFileSplitter
, it does not populate sequence details headers, since START/END marker messages are also included into the sequence size. TheFileHeaders.FILENAME
is still populated for each line emitted, including START/END marker messages. -
The
FileMarkerReleaseStrategy
- checks forFileSplitter.FileMarker.Mark.END
message in the group and then compare aFileHeaders.LINE_COUNT
header value with the group size minus2
-FileSplitter.FileMarker
instances. It also implements a convenientGroupConditionProvider
contact forconditionSupplier
function to be used in theAbstractCorrelatingMessageHandler
. See Message Group Condition for more information. -
The
FileAggregatingMessageGroupProcessor
just removesFileSplitter.FileMarker
messages from the group and collect the rest of messages into a list payload to produce.
The following listing shows possible ways to configure a FileAggregator
:
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
integrationFlow {
split(Files.splitter().markers().firstLineAsHeader("firstLine"))
channel { executor(taskExecutor) }
filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
transform(String::toUpperCase)
channel("aggregatorChannel")
aggregate(FileAggregator())
channel { queue("resultChannel") }
}
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new FileAggregator());
aggregator.setOutputChannel(outputChannel);
return aggregator;
}
<int:chain input-channel="input" output-channel="output">
<int-file:splitter markers="true"/>
<int:aggregator>
<bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
</int:aggregator>
</int:chain>
If default behavior of the FileAggregator
does not satisfy the target logic, it is recommended to configure an aggregator endpoint with individual strategies.
See FileAggregator
JavaDocs for more information.
Remote Persistent File List Filters
Inbound and streaming inbound remote file channel adapters (FTP
, SFTP
, and other technologies) are configured with corresponding implementations of AbstractPersistentFileListFilter
by default, configured with an in-memory MetadataStore
.
To run in a cluster, these can be replaced with filters using a shared MetadataStore
(see Metadata Store for more information).
These filters are used to prevent fetching the same file multiple times (unless it’s modified time changes).
Starting with version 5.2, a file is added to the filter immediately before the file is fetched (and reversed if the fetch fails).
In the event of a catastrophic failure (such as power loss), it is possible that the file currently being fetched will remain in the filter and won’t be re-fetched when restarting the application.
In this case you would need to manually remove this file from the MetadataStore .
|
In previous versions, the files were filtered before any were fetched, meaning that several files could be in this state after a catastrophic failure.
In order to facilitate this new behavior, two new methods have been added to FileListFilter
.
boolean accept(F file);
boolean supportsSingleFileFiltering();
If a filter returns true
in supportsSingleFileFiltering
, it must implement accept()
.
If a remote filter does not support single file filtering (such as the AbstractMarkerFilePresentFileListFilter
), the adapters revert to the previous behavior.
If multiple filters are in used (using a CompositeFileListFilter
or ChainFileListFilter
), then all of the delegate filters must support single file filtering in order for the composite filter to support it.
The persistent file list filters now have a boolean property forRecursion
.
Setting this property to true
, also sets alwaysAcceptDirectories
, which means that the recursive operation on the outbound gateways (ls
and mget
) will now always traverse the full directory tree each time.
This is to solve a problem where changes deep in the directory tree were not detected.
In addition, forRecursion=true
causes the full path to files to be used as the metadata store keys; this solves a problem where the filter did not work properly if a file with the same name appears multiple times in different directories.
IMPORTANT: This means that existing keys in a persistent metadata store will not be found for files beneath the top level directory.
For this reason, the property is false
by default; this may change in a future release.