org.springframework.integration.file
Class FileReadingMessageSource

java.lang.Object
  extended by org.springframework.integration.context.IntegrationObjectSupport
      extended by org.springframework.integration.file.FileReadingMessageSource
All Implemented Interfaces:
BeanFactoryAware, BeanNameAware, InitializingBean, NamedComponent, MessageSource<java.io.File>

public class FileReadingMessageSource
extends IntegrationObjectSupport
implements MessageSource<java.io.File>

MessageSource that creates messages from a file system directory. To prevent messages for certain files, you may supply a FileListFilter. By default, an AcceptOnceFileListFilter is used. It ensures files are picked up only once from the directory.

A common problem with reading files is that a file may be detected before it is ready. The default AcceptOnceFileListFilter does not prevent this. In most cases, this can be prevented if the file-writing process renames each file as soon as it is ready for reading. A pattern-matching filter that accepts only files that are ready (e.g. based on a known suffix), composed with the default AcceptOnceFileListFilter would allow for this.

A Comparator can be used to ensure internal ordering of the Files in a PriorityBlockingQueue. This does not provide the same guarantees as a ResequencingMessageGroupProcessor, but in cases where writing files and failure downstream are rare it might be sufficient.

FileReadingMessageSource is fully thread-safe under concurrent receive() invocations and message delivery callbacks.


Constructor Summary
FileReadingMessageSource()
          Creates a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
FileReadingMessageSource(java.util.Comparator<java.io.File> receptionOrderComparator)
          Creates a FileReadingMessageSource with a PriorityBlockingQueue ordered with the passed in Comparator

The size of the queue used should be large enough to hold all the files in the input directory in order to sort all of them, so restricting the size of the queue is mutually exclusive with ordering.

FileReadingMessageSource(int internalQueueCapacity)
          Creates a FileReadingMessageSource with a bounded queue of the given capacity.
 
Method Summary
 java.lang.String getComponentType()
          Subclasses may implement this method to provide component type information.
 void onFailure(Message<java.io.File> failedMessage)
          Adds the failed message back to the 'toBeReceived' queue if there is room.
protected  void onInit()
          Subclasses may implement this for initialization logic.
 void onSend(Message<java.io.File> sentMessage)
          The message is just logged.
 Message<java.io.File> receive()
          Retrieve the next available message from this source.
 void setAutoCreateDirectory(boolean autoCreateDirectory)
          Specify whether to create the source directory automatically if it does not yet exist upon initialization.
 void setDirectory(java.io.File directory)
          Specify the input directory.
 void setFilter(FileListFilter<java.io.File> filter)
          Sets a FileListFilter.
 void setLocker(FileLocker locker)
          Optional.
 void setScanEachPoll(boolean scanEachPoll)
          Optional.
 void setScanner(DirectoryScanner scanner)
          Optionally specify a custom scanner, for example the RecursiveLeafOnlyDirectoryScanner
 
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, getBeanFactory, getComponentName, getConversionService, getTaskScheduler, setBeanFactory, setBeanName, setComponentName, setConversionService, setTaskScheduler, toString
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Constructor Detail

FileReadingMessageSource

public FileReadingMessageSource()
Creates a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.


FileReadingMessageSource

public FileReadingMessageSource(int internalQueueCapacity)
Creates a FileReadingMessageSource with a bounded queue of the given capacity. This can be used to reduce the memory footprint of this component when reading from a large directory.

Parameters:
internalQueueCapacity - the size of the queue used to cache files to be received internally. This queue can be made larger to optimize the directory scanning. With scanEachPoll set to false and the queue to a large size, it will be filled once and then completely emptied before a new directory listing is done. This is particularly useful to reduce scans of large numbers of files in a directory.

FileReadingMessageSource

public FileReadingMessageSource(java.util.Comparator<java.io.File> receptionOrderComparator)
Creates a FileReadingMessageSource with a PriorityBlockingQueue ordered with the passed in Comparator

The size of the queue used should be large enough to hold all the files in the input directory in order to sort all of them, so restricting the size of the queue is mutually exclusive with ordering. No guarantees about file delivery order can be made under concurrent access.

Parameters:
receptionOrderComparator - the comparator to be used to order the files in the internal queue
Method Detail

setDirectory

public void setDirectory(java.io.File directory)
Specify the input directory.

Parameters:
directory - to monitor

setScanner

public void setScanner(DirectoryScanner scanner)
Optionally specify a custom scanner, for example the RecursiveLeafOnlyDirectoryScanner

Parameters:
scanner - scanner implementation

setAutoCreateDirectory

public void setAutoCreateDirectory(boolean autoCreateDirectory)
Specify whether to create the source directory automatically if it does not yet exist upon initialization. By default, this value is true. If set to false and the source directory does not exist, an Exception will be thrown upon initialization.

Parameters:
autoCreateDirectory - should the directory to be monitored be created when this component starts up?

setFilter

public void setFilter(FileListFilter<java.io.File> filter)
Sets a FileListFilter. By default a AbstractFileListFilter with no bounds is used. In most cases a customized FileListFilter will be needed to deal with modification and duplication concerns. If multiple filters are required a CompositeFileListFilter can be used to group them together.

The supplied filter must be thread safe..

Parameters:
filter - a filter

setLocker

public void setLocker(FileLocker locker)
Optional. Sets a FileLocker to be used to guard files against duplicate processing.

The supplied FileLocker must be thread safe

Parameters:
locker - a locker

setScanEachPoll

public void setScanEachPoll(boolean scanEachPoll)
Optional. Set this flag if you want to make sure the internal queue is refreshed with the latest content of the input directory on each poll.

By default this implementation will empty its queue before looking at the directory again. In cases where order is relevant it is important to consider the effects of setting this flag. The internal BlockingQueue that this class is keeping will more likely be out of sync with the file system if this flag is set to false, but it will change more often (causing expensive reordering) if it is set to true.

Parameters:
scanEachPoll - whether or not the component should re-scan (as opposed to not rescanning until the entire backlog has been delivered)

getComponentType

public java.lang.String getComponentType()
Description copied from class: IntegrationObjectSupport
Subclasses may implement this method to provide component type information.

Specified by:
getComponentType in interface NamedComponent
Overrides:
getComponentType in class IntegrationObjectSupport

onInit

protected void onInit()
Description copied from class: IntegrationObjectSupport
Subclasses may implement this for initialization logic.

Overrides:
onInit in class IntegrationObjectSupport

receive

public Message<java.io.File> receive()
                              throws MessagingException
Description copied from interface: MessageSource
Retrieve the next available message from this source. Returns null if no message is available.

Specified by:
receive in interface MessageSource<java.io.File>
Throws:
MessagingException

onFailure

public void onFailure(Message<java.io.File> failedMessage)
Adds the failed message back to the 'toBeReceived' queue if there is room.

Parameters:
failedMessage - the Message that failed

onSend

public void onSend(Message<java.io.File> sentMessage)
The message is just logged. It was already removed from the queue during the call to receive()

Parameters:
sentMessage - the message that was successfully delivered