Class FileReadingMessageSource
- All Implemented Interfaces:
Aware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,Lifecycle
,MessageSource<File>
,IntegrationPattern
,NamedComponent
,IntegrationInboundManagement
,IntegrationManagement
,ManageableLifecycle
MessageSource
that creates messages
from a file system directory.
To prevent messages for certain files, you may supply a FileListFilter
.
By default, when configuring with XML or the DSL,
an AcceptOnceFileListFilter
is used.
It ensures files are picked up only once from the directory.
A common problem with reading files is that a file may be detected before it
is ready. The default
AcceptOnceFileListFilter
does not prevent this. In most cases, this can be prevented if the
file-writing process renames each file as soon as it is ready for reading. A
pattern-matching filter that accepts only files that are ready (e.g. based on
a known suffix), composed with the default
AcceptOnceFileListFilter
would allow for this.
If a external DirectoryScanner
is used, then the FileLocker
and FileListFilter
objects should be set on the external
DirectoryScanner
, not the instance of FileReadingMessageSource. An
IllegalStateException
will result otherwise.
A Comparator
can be used to ensure internal ordering of the Files in
a PriorityBlockingQueue
. This does not provide the same guarantees as
a ResequencingMessageGroupProcessor
,
but in cases where writing files
and failure downstream are rare it might be sufficient.
FileReadingMessageSource is fully thread-safe under concurrent
receive()
invocations and message delivery callbacks.
- Author:
- Iwein Fuld, Mark Fisher, Oleg Zhurakousky, Gary Russell, Artem Bilan, Steven Pearce, Patryk Ziobron
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides
-
Field Summary
Fields inherited from class org.springframework.integration.util.AbstractExpressionEvaluator
EXPRESSION_PARSER, logger
Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
-
Constructor Summary
ConstructorDescriptionCreate a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.FileReadingMessageSource
(int internalQueueCapacity) Create a FileReadingMessageSource with a bounded queue of the given capacity.FileReadingMessageSource
(Comparator<File> receptionOrderComparator) Create a FileReadingMessageSource with aPriorityBlockingQueue
ordered with the passed inComparator
. -
Method Summary
Modifier and TypeMethodDescriptionprotected AbstractIntegrationMessageBuilder<File>
Subclasses must implement this method.Thescanner
property accessor to allow to modify its options (filter
,locker
etc.) at runtime using theFileReadingMessageSource
bean.boolean
boolean
void
Adds the failed message back to the 'toBeReceived' queue if there is room.protected void
onInit()
void
setAutoCreateDirectory
(boolean autoCreateDirectory) Specify whether to create the source directory automatically if it does not yet exist upon initialization.void
setDirectory
(File directory) Specify the input directory.void
setFilter
(FileListFilter<File> filter) Set aFileListFilter
.void
setLocker
(FileLocker locker) Set aFileLocker
to be used to guard files against duplicate processing.void
setScanEachPoll
(boolean scanEachPoll) Set this flag if you want to make sure the internal queue is refreshed with the latest content of the input directory on each poll.void
setScanner
(DirectoryScanner scanner) Optionally specify a custom scanner, for example theFileReadingMessageSource.WatchServiceDirectoryScanner
.void
setUseWatchService
(boolean useWatchService) Switch thisFileReadingMessageSource
to use its internalFileReadingMessageSource.WatchServiceDirectoryScanner
.void
setWatchDirPredicate
(Predicate<Path> watchDirPredicate) Set aPredicate
to check a directory in theFiles.walkFileTree(Path, Set, int, FileVisitor)
call if it is eligible forWatchService
.void
setWatchEvents
(FileReadingMessageSource.WatchEventType... watchEvents) TheWatchService
event types.void
setWatchMaxDepth
(int watchMaxDepth) Set a max depth for theFiles.walkFileTree(Path, Set, int, FileVisitor)
API whenuseWatchService
is enabled.void
start()
void
stop()
Methods inherited from class org.springframework.integration.endpoint.AbstractMessageSource
buildMessage, destroy, getBeanName, getComponentName, getManagedName, getManagedType, getOverrides, isLoggingEnabled, receive, registerMetricsCaptor, setBeanName, setHeaderExpressions, setLoggingEnabled, setManagedName, setManagedType
Methods inherited from class org.springframework.integration.util.AbstractExpressionEvaluator
afterPropertiesSet, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, getBeanFactory, getEvaluationContext, getEvaluationContext, getMessageBuilderFactory, setBeanFactory, setConversionService
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAs, isObserved, registerObservationRegistry
Methods inherited from interface org.springframework.integration.core.MessageSource
getIntegrationPatternType
-
Constructor Details
-
FileReadingMessageSource
public FileReadingMessageSource()Create a FileReadingMessageSource with a naturally ordered queue of unbounded capacity. -
FileReadingMessageSource
public FileReadingMessageSource(int internalQueueCapacity) Create a FileReadingMessageSource with a bounded queue of the given capacity. This can be used to reduce the memory footprint of this component when reading from a large directory.- Parameters:
internalQueueCapacity
- the size of the queue used to cache files to be received internally. This queue can be made larger to optimize the directory scanning. With scanEachPoll set to false and the queue to a large size, it will be filled once and then completely emptied before a new directory listing is done. This is particularly useful to reduce scans of large numbers of files in a directory.
-
FileReadingMessageSource
Create a FileReadingMessageSource with aPriorityBlockingQueue
ordered with the passed inComparator
.The size of the queue used should be large enough to hold all the files in the input directory in order to sort all of them, so restricting the size of the queue is mutually exclusive with ordering. No guarantees about file delivery order can be made under concurrent access.
- Parameters:
receptionOrderComparator
- the comparator to be used to order the files in the internal queue
-
-
Method Details
-
setDirectory
Specify the input directory.- Parameters:
directory
- to monitor
-
setScanner
Optionally specify a custom scanner, for example theFileReadingMessageSource.WatchServiceDirectoryScanner
.- Parameters:
scanner
- scanner implementation
-
getScanner
Thescanner
property accessor to allow to modify its options (filter
,locker
etc.) at runtime using theFileReadingMessageSource
bean.- Returns:
- the
DirectoryScanner
of thisFileReadingMessageSource
. - Since:
- 4.2
-
setAutoCreateDirectory
public void setAutoCreateDirectory(boolean autoCreateDirectory) Specify whether to create the source directory automatically if it does not yet exist upon initialization. By default, this value is true. If set to false and the source directory does not exist, an Exception will be thrown upon initialization.- Parameters:
autoCreateDirectory
- should the directory to be monitored be created when this component starts up?
-
setFilter
Set aFileListFilter
. By default aAcceptOnceFileListFilter
with no bounds is used. In most cases a customizedFileListFilter
will be needed to deal with modification and duplication concerns. If multiple filters are required aCompositeFileListFilter
can be used to group them together.The supplied filter must be thread safe..
- Parameters:
filter
- a filter
-
setLocker
Set aFileLocker
to be used to guard files against duplicate processing.The supplied FileLocker must be thread safe
- Parameters:
locker
- a locker
-
setScanEachPoll
public void setScanEachPoll(boolean scanEachPoll) Set this flag if you want to make sure the internal queue is refreshed with the latest content of the input directory on each poll.By default, this implementation will empty its queue before looking at the directory again. In cases where order is relevant it is important to consider the effects of setting this flag. The internal
BlockingQueue
that this class is keeping will more likely be out of sync with the file system if this flag is set to false, but it will change more often (causing expensive reordering) if it is set to true.- Parameters:
scanEachPoll
- whether the component should re-scan (as opposed to not rescanning until the entire backlog has been delivered)
-
setUseWatchService
public void setUseWatchService(boolean useWatchService) Switch thisFileReadingMessageSource
to use its internalFileReadingMessageSource.WatchServiceDirectoryScanner
.- Parameters:
useWatchService
- theboolean
flag to switch toFileReadingMessageSource.WatchServiceDirectoryScanner
ontrue
.- Since:
- 4.3
- See Also:
-
isUseWatchService
public boolean isUseWatchService() -
setWatchEvents
- Parameters:
watchEvents
- the set ofFileReadingMessageSource.WatchEventType
.- Since:
- 4.3
- See Also:
-
setWatchMaxDepth
public void setWatchMaxDepth(int watchMaxDepth) Set a max depth for theFiles.walkFileTree(Path, Set, int, FileVisitor)
API whenuseWatchService
is enabled. Defaults toInteger.MAX_VALUE
- walk the whole tree.- Parameters:
watchMaxDepth
- the depth forFiles.walkFileTree(Path, Set, int, FileVisitor)
.- Since:
- 6.1
-
setWatchDirPredicate
Set aPredicate
to check a directory in theFiles.walkFileTree(Path, Set, int, FileVisitor)
call if it is eligible forWatchService
.- Parameters:
watchDirPredicate
- thePredicate
to check dirs for walking.- Since:
- 6.1
-
getComponentType
- Specified by:
getComponentType
in interfaceNamedComponent
-
start
public void start()- Specified by:
start
in interfaceLifecycle
- Specified by:
start
in interfaceManageableLifecycle
-
stop
public void stop()- Specified by:
stop
in interfaceLifecycle
- Specified by:
stop
in interfaceManageableLifecycle
-
isRunning
public boolean isRunning()- Specified by:
isRunning
in interfaceLifecycle
- Specified by:
isRunning
in interfaceManageableLifecycle
-
onInit
protected void onInit()- Overrides:
onInit
in classAbstractExpressionEvaluator
-
doReceive
Description copied from class:AbstractMessageSource
Subclasses must implement this method. Typically the returned value will be thepayload
of type T, but the returned value may also be aMessage
instance whose payload is of type T; also can beAbstractIntegrationMessageBuilder
which is used for additional headers population.- Specified by:
doReceive
in classAbstractMessageSource<File>
- Returns:
- The value returned.
-
onFailure
Adds the failed message back to the 'toBeReceived' queue if there is room.- Parameters:
failedMessage
- theMessage
that failed
-