Class FileReadingMessageSource

All Implemented Interfaces:
Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, Lifecycle, MessageSource<File>, IntegrationPattern, NamedComponent, IntegrationInboundManagement, IntegrationManagement, ManageableLifecycle

public class FileReadingMessageSource extends AbstractMessageSource<File> implements ManageableLifecycle
MessageSource that creates messages from a file system directory. To prevent messages for certain files, you may supply a FileListFilter. By default, when configuring with XML or the DSL, an AcceptOnceFileListFilter is used. It ensures files are picked up only once from the directory.

A common problem with reading files is that a file may be detected before it is ready. The default AcceptOnceFileListFilter does not prevent this. In most cases, this can be prevented if the file-writing process renames each file as soon as it is ready for reading. A pattern-matching filter that accepts only files that are ready (e.g. based on a known suffix), composed with the default AcceptOnceFileListFilter would allow for this.

If a external DirectoryScanner is used, then the FileLocker and FileListFilter objects should be set on the external DirectoryScanner, not the instance of FileReadingMessageSource. An IllegalStateException will result otherwise.

A Comparator can be used to ensure internal ordering of the Files in a PriorityBlockingQueue. This does not provide the same guarantees as a ResequencingMessageGroupProcessor, but in cases where writing files and failure downstream are rare it might be sufficient.

FileReadingMessageSource is fully thread-safe under concurrent receive() invocations and message delivery callbacks.

Iwein Fuld, Mark Fisher, Oleg Zhurakousky, Gary Russell, Artem Bilan, Steven Pearce, Patryk Ziobron
  • Constructor Details

    • FileReadingMessageSource

      public FileReadingMessageSource()
      Create a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
    • FileReadingMessageSource

      public FileReadingMessageSource(int internalQueueCapacity)
      Create a FileReadingMessageSource with a bounded queue of the given capacity. This can be used to reduce the memory footprint of this component when reading from a large directory.
      internalQueueCapacity - the size of the queue used to cache files to be received internally. This queue can be made larger to optimize the directory scanning. With scanEachPoll set to false and the queue to a large size, it will be filled once and then completely emptied before a new directory listing is done. This is particularly useful to reduce scans of large numbers of files in a directory.
    • FileReadingMessageSource

      public FileReadingMessageSource(@Nullable Comparator<File> receptionOrderComparator)
      Create a FileReadingMessageSource with a PriorityBlockingQueue ordered with the passed in Comparator.

      The size of the queue used should be large enough to hold all the files in the input directory in order to sort all of them, so restricting the size of the queue is mutually exclusive with ordering. No guarantees about file delivery order can be made under concurrent access.

      receptionOrderComparator - the comparator to be used to order the files in the internal queue
  • Method Details

    • setDirectory

      public void setDirectory(File directory)
      Specify the input directory.
      directory - to monitor
    • setScanner

      public void setScanner(DirectoryScanner scanner)
      Optionally specify a custom scanner, for example the FileReadingMessageSource.WatchServiceDirectoryScanner.
      scanner - scanner implementation
    • getScanner

      public DirectoryScanner getScanner()
      The scanner property accessor to allow to modify its options (filter, locker etc.) at runtime using the FileReadingMessageSource bean.
      the DirectoryScanner of this FileReadingMessageSource.
    • setAutoCreateDirectory

      public void setAutoCreateDirectory(boolean autoCreateDirectory)
      Specify whether to create the source directory automatically if it does not yet exist upon initialization. By default, this value is true. If set to false and the source directory does not exist, an Exception will be thrown upon initialization.
      autoCreateDirectory - should the directory to be monitored be created when this component starts up?
    • setFilter

      public void setFilter(FileListFilter<File> filter)
      Set a FileListFilter. By default a AcceptOnceFileListFilter with no bounds is used. In most cases a customized FileListFilter will be needed to deal with modification and duplication concerns. If multiple filters are required a CompositeFileListFilter can be used to group them together.

      The supplied filter must be thread safe..

      filter - a filter
    • setLocker

      public void setLocker(FileLocker locker)
      Set a FileLocker to be used to guard files against duplicate processing.

      The supplied FileLocker must be thread safe

      locker - a locker
    • setScanEachPoll

      public void setScanEachPoll(boolean scanEachPoll)
      Set this flag if you want to make sure the internal queue is refreshed with the latest content of the input directory on each poll.

      By default, this implementation will empty its queue before looking at the directory again. In cases where order is relevant it is important to consider the effects of setting this flag. The internal BlockingQueue that this class is keeping will more likely be out of sync with the file system if this flag is set to false, but it will change more often (causing expensive reordering) if it is set to true.

      scanEachPoll - whether the component should re-scan (as opposed to not rescanning until the entire backlog has been delivered)
    • setUseWatchService

      public void setUseWatchService(boolean useWatchService)
      Switch this FileReadingMessageSource to use its internal FileReadingMessageSource.WatchServiceDirectoryScanner.
      useWatchService - the boolean flag to switch to FileReadingMessageSource.WatchServiceDirectoryScanner on true.
      See Also:
    • isUseWatchService

      public boolean isUseWatchService()
    • setWatchEvents

      public void setWatchEvents(FileReadingMessageSource.WatchEventType... watchEvents)
      The WatchService event types. If setUseWatchService(boolean) isn't true, this option is ignored.
      watchEvents - the set of FileReadingMessageSource.WatchEventType.
      See Also:
    • setWatchMaxDepth

      public void setWatchMaxDepth(int watchMaxDepth)
      Set a max depth for the Files.walkFileTree(Path, Set, int, FileVisitor) API when useWatchService is enabled. Defaults to Integer.MAX_VALUE - walk the whole tree.
      watchMaxDepth - the depth for Files.walkFileTree(Path, Set, int, FileVisitor).
    • setWatchDirPredicate

      public void setWatchDirPredicate(Predicate<Path> watchDirPredicate)
      Set a Predicate to check a directory in the Files.walkFileTree(Path, Set, int, FileVisitor) call if it is eligible for WatchService.
      watchDirPredicate - the Predicate to check dirs for walking.
    • getComponentType

      public String getComponentType()
      Specified by:
      getComponentType in interface NamedComponent
    • start

      public void start()
      Specified by:
      start in interface Lifecycle
      Specified by:
      start in interface ManageableLifecycle
    • stop

      public void stop()
      Specified by:
      stop in interface Lifecycle
      Specified by:
      stop in interface ManageableLifecycle
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
      Specified by:
      isRunning in interface ManageableLifecycle
    • onInit

      protected void onInit()
      onInit in class AbstractExpressionEvaluator
    • doReceive

      protected AbstractIntegrationMessageBuilder<File> doReceive()
      Description copied from class: AbstractMessageSource
      Subclasses must implement this method. Typically the returned value will be the payload of type T, but the returned value may also be a Message instance whose payload is of type T; also can be AbstractIntegrationMessageBuilder which is used for additional headers population.
      Specified by:
      doReceive in class AbstractMessageSource<File>
      The value returned.
    • onFailure

      public void onFailure(Message<File> failedMessage)
      Adds the failed message back to the 'toBeReceived' queue if there is room.
      failedMessage - the Message that failed