public class FileReadingMessageSource extends AbstractMessageSource<File> implements ManageableLifecycle
MessageSource
that creates messages
from a file system directory.
To prevent messages for certain files, you may supply a FileListFilter
.
By default, when configuring with XML or the DSL,
an AcceptOnceFileListFilter
is used.
It ensures files are picked up only once from the directory.
A common problem with reading files is that a file may be detected before it
is ready. The default
AcceptOnceFileListFilter
does not prevent this. In most cases, this can be prevented if the
file-writing process renames each file as soon as it is ready for reading. A
pattern-matching filter that accepts only files that are ready (e.g. based on
a known suffix), composed with the default
AcceptOnceFileListFilter
would allow for this.
If a external DirectoryScanner
is used, then the FileLocker
and FileListFilter
objects should be set on the external
DirectoryScanner
, not the instance of FileReadingMessageSource. An
IllegalStateException
will result otherwise.
A Comparator
can be used to ensure internal ordering of the Files in
a PriorityBlockingQueue
. This does not provide the same guarantees as
a ResequencingMessageGroupProcessor
,
but in cases where writing files
and failure downstream are rare it might be sufficient.
FileReadingMessageSource is fully thread-safe under concurrent
receive()
invocations and message delivery callbacks.
Modifier and Type | Class and Description |
---|---|
static class |
FileReadingMessageSource.WatchEventType |
IntegrationManagement.ManagementOverrides
EXPRESSION_PARSER, logger
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Constructor and Description |
---|
FileReadingMessageSource()
Creates a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
|
FileReadingMessageSource(Comparator<File> receptionOrderComparator)
Creates a FileReadingMessageSource with a
PriorityBlockingQueue
ordered with the passed in Comparator . |
FileReadingMessageSource(int internalQueueCapacity)
Creates a FileReadingMessageSource with a bounded queue of the given
capacity.
|
Modifier and Type | Method and Description |
---|---|
protected AbstractIntegrationMessageBuilder<File> |
doReceive()
Subclasses must implement this method.
|
String |
getComponentType() |
DirectoryScanner |
getScanner()
The
scanner property accessor to allow to modify its options
(filter , locker etc.) at runtime using the
FileReadingMessageSource bean. |
boolean |
isRunning() |
boolean |
isUseWatchService() |
void |
onFailure(Message<File> failedMessage)
Adds the failed message back to the 'toBeReceived' queue if there is room.
|
protected void |
onInit() |
void |
setAutoCreateDirectory(boolean autoCreateDirectory)
Specify whether to create the source directory automatically if it does
not yet exist upon initialization.
|
void |
setDirectory(File directory)
Specify the input directory.
|
void |
setFilter(FileListFilter<File> filter)
Sets a
FileListFilter . |
void |
setLocker(FileLocker locker)
Optional.
|
void |
setScanEachPoll(boolean scanEachPoll)
Optional.
|
void |
setScanner(DirectoryScanner scanner)
Optionally specify a custom scanner, for example the
WatchServiceDirectoryScanner |
void |
setUseWatchService(boolean useWatchService)
Switch this
FileReadingMessageSource to use its internal
FileReadingMessageSource.WatchServiceDirectoryScanner . |
void |
setWatchEvents(FileReadingMessageSource.WatchEventType... watchEvents)
The
WatchService event types. |
void |
start() |
void |
stop() |
buildMessage, destroy, getBeanName, getComponentName, getManagedName, getManagedType, getOverrides, isLoggingEnabled, receive, registerMetricsCaptor, setBeanName, setHeaderExpressions, setLoggingEnabled, setManagedName, setManagedType
afterPropertiesSet, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, getBeanFactory, getEvaluationContext, getEvaluationContext, getMessageBuilderFactory, setBeanFactory, setConversionService
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getIntegrationPatternType
getThisAs
public FileReadingMessageSource()
public FileReadingMessageSource(int internalQueueCapacity)
internalQueueCapacity
- the size of the queue used to cache files to be received
internally. This queue can be made larger to optimize the
directory scanning. With scanEachPoll set to false and the
queue to a large size, it will be filled once and then
completely emptied before a new directory listing is done.
This is particularly useful to reduce scans of large numbers
of files in a directory.public FileReadingMessageSource(@Nullable Comparator<File> receptionOrderComparator)
PriorityBlockingQueue
ordered with the passed in Comparator
.
The size of the queue used should be large enough to hold all the files in the input directory in order to sort all of them, so restricting the size of the queue is mutually exclusive with ordering. No guarantees about file delivery order can be made under concurrent access.
receptionOrderComparator
- the comparator to be used to order the files in the internal
queuepublic void setDirectory(File directory)
directory
- to monitorpublic void setScanner(DirectoryScanner scanner)
WatchServiceDirectoryScanner
scanner
- scanner implementationpublic DirectoryScanner getScanner()
scanner
property accessor to allow to modify its options
(filter
, locker
etc.) at runtime using the
FileReadingMessageSource
bean.DirectoryScanner
of this FileReadingMessageSource
.public void setAutoCreateDirectory(boolean autoCreateDirectory)
autoCreateDirectory
- should the directory to be monitored be created when this
component starts up?public void setFilter(FileListFilter<File> filter)
FileListFilter
.
By default a AcceptOnceFileListFilter
with no bounds is used. In most cases a customized FileListFilter
will
be needed to deal with modification and duplication concerns.
If multiple filters are required a
CompositeFileListFilter
can be used to group them together.
The supplied filter must be thread safe..
filter
- a filterpublic void setLocker(FileLocker locker)
FileLocker
to be used to guard files against
duplicate processing.
The supplied FileLocker must be thread safe
locker
- a lockerpublic void setScanEachPoll(boolean scanEachPoll)
By default this implementation will empty its queue before looking at the
directory again. In cases where order is relevant it is important to
consider the effects of setting this flag. The internal
BlockingQueue
that this class is keeping
will more likely be out of sync with the file system if this flag is set
to false
, but it will change more often (causing expensive
reordering) if it is set to true
.
scanEachPoll
- whether or not the component should re-scan (as opposed to not
rescanning until the entire backlog has been delivered)public void setUseWatchService(boolean useWatchService)
FileReadingMessageSource
to use its internal
FileReadingMessageSource.WatchServiceDirectoryScanner
.useWatchService
- the boolean
flag to switch to
FileReadingMessageSource.WatchServiceDirectoryScanner
on true
.setWatchEvents(org.springframework.integration.file.FileReadingMessageSource.WatchEventType...)
public boolean isUseWatchService()
public void setWatchEvents(FileReadingMessageSource.WatchEventType... watchEvents)
watchEvents
- the set of FileReadingMessageSource.WatchEventType
.setUseWatchService(boolean)
public String getComponentType()
getComponentType
in interface NamedComponent
public void start()
start
in interface Lifecycle
start
in interface ManageableLifecycle
public void stop()
stop
in interface Lifecycle
stop
in interface ManageableLifecycle
public boolean isRunning()
isRunning
in interface Lifecycle
isRunning
in interface ManageableLifecycle
protected void onInit()
onInit
in class AbstractExpressionEvaluator
protected AbstractIntegrationMessageBuilder<File> doReceive()
AbstractMessageSource
payload
of
type T, but the returned value may also be a Message
instance whose payload is of type T;
also can be AbstractIntegrationMessageBuilder
which is used for additional headers population.doReceive
in class AbstractMessageSource<File>