org.springframework.integration.file
Class FileReadingMessageSource

java.lang.Object
  extended by org.springframework.integration.file.FileReadingMessageSource
All Implemented Interfaces:
org.springframework.beans.factory.InitializingBean, MessageSource<java.io.File>

public class FileReadingMessageSource
extends java.lang.Object
implements MessageSource<java.io.File>, org.springframework.beans.factory.InitializingBean

MessageSource that creates messages from a file system directory. To prevent messages for certain files, you may supply a FileListFilter. By default, an AcceptOnceFileListFilter is used. It ensures files are picked up only once from the directory.

A common problem with reading files is that a file may be detected before it is ready. The default AcceptOnceFileListFilter does not prevent this. In most cases, this can be prevented if the file-writing process renames each file as soon as it is ready for reading. A pattern-matching filter that accepts only files that are ready (e.g. based on a known suffix), composed with the default AcceptOnceFileListFilter would allow for this. See CompositeFileListFilter for a way to do this.

A Comparator can be used to ensure internal ordering of the Files in a PriorityBlockingQueue. This does not provide the same guarantees as a Resequencer, but in cases where writing files and failure downstream are rare it might be sufficient.

FileReadingMessageSource is fully thread-safe under concurrent receive() invocations and message delivery callbacks.

Author:
Iwein Fuld, Mark Fisher

Constructor Summary
FileReadingMessageSource()
          Creates a FileReadingMessageSource with a naturally ordered queue.
FileReadingMessageSource(java.util.Comparator<java.io.File> receptionOrderComparator)
          Creates a FileReadingMessageSource with a PriorityBlockingQueue ordered with the passed in Comparator

No guarantees about file delivery order can be made under concurrent access.

 
Method Summary
 void afterPropertiesSet()
           
 void onFailure(Message<java.io.File> failedMessage, java.lang.Throwable t)
          Adds the failed message back to the 'toBeReceived' queue.
 void onSend(Message<java.io.File> sentMessage)
          The message is just logged.
 Message<java.io.File> receive()
          Retrieve the next available message from this source.
 void setAutoCreateDirectory(boolean autoCreateDirectory)
          Specify whether to create the source directory automatically if it does not yet exist upon initialization.
 void setDirectory(java.io.File directory)
          Specify the input directory.
 void setFilter(FileListFilter filter)
          Sets a FileListFilter.
 void setLocker(FileLocker locker)
          Optional.
 void setScanEachPoll(boolean scanEachPoll)
          Optional.
 void setScanner(DirectoryScanner scanner)
          Optionally specify a custom scanner, for example the RecursiveLeafOnlyDirectoryScanner
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

FileReadingMessageSource

public FileReadingMessageSource()
Creates a FileReadingMessageSource with a naturally ordered queue.


FileReadingMessageSource

public FileReadingMessageSource(java.util.Comparator<java.io.File> receptionOrderComparator)
Creates a FileReadingMessageSource with a PriorityBlockingQueue ordered with the passed in Comparator

No guarantees about file delivery order can be made under concurrent access.

Method Detail

setDirectory

public void setDirectory(java.io.File directory)
Specify the input directory.


setScanner

public void setScanner(DirectoryScanner scanner)
Optionally specify a custom scanner, for example the RecursiveLeafOnlyDirectoryScanner


setAutoCreateDirectory

public void setAutoCreateDirectory(boolean autoCreateDirectory)
Specify whether to create the source directory automatically if it does not yet exist upon initialization. By default, this value is true. If set to false and the source directory does not exist, an Exception will be thrown upon initialization.


setFilter

public void setFilter(FileListFilter filter)
Sets a FileListFilter. By default a AcceptOnceFileListFilter with no bounds is used. In most cases a customized FileListFilter will be needed to deal with modification and duplication concerns. If multiple filters are required a CompositeFileListFilter can be used to group them together.

The supplied filter must be thread safe..


setLocker

public void setLocker(FileLocker locker)
Optional. Sets a FileLocker to be used instead of the default NoopFileLocker. Note that the locker is not queried by this FileReadingMessageSource: integration with a FileListFilter is an external concern.

The supplied FileLocker must be thread safe


setScanEachPoll

public void setScanEachPoll(boolean scanEachPoll)
Optional. Set this flag if you want to make sure the internal queue is refreshed with the latest content of the input directory on each poll.

By default this implementation will empty its queue before looking at the directory again. In cases where order is relevant it is important to consider the effects of setting this flag. The internal PriorityBlockingQueue that this class is keeping will more likely be out of sync with the filesystem if this flag is set to false, but it will change more often (causing reordering) if it is set to true.


afterPropertiesSet

public final void afterPropertiesSet()
Specified by:
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean

receive

public Message<java.io.File> receive()
                              throws MessagingException
Description copied from interface: MessageSource
Retrieve the next available message from this source. Returns null if no message is available.

Specified by:
receive in interface MessageSource<java.io.File>
Throws:
MessagingException

onFailure

public void onFailure(Message<java.io.File> failedMessage,
                      java.lang.Throwable t)
Adds the failed message back to the 'toBeReceived' queue.


onSend

public void onSend(Message<java.io.File> sentMessage)
The message is just logged. It was already removed from the queue during the call to receive()