Inbound Channel Adapters: Polling Multiple Servers and Directories

Starting with version 5.0.7, the RotatingServerAdvice is available; when configured as a poller advice, the inbound adapters can poll multiple servers and directories. Configure the advice and add it to the poller’s advice chain as normal. A DelegatingSessionFactory is used to select the server see Delegating Session Factory for more information. The advice configuration consists of a list of RotationPolicy.KeyDirectory objects.

Example
@Bean
public RotatingServerAdvice advice() {
    List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

This advice will poll directory foo on server one until no new files exist then move to directory bar and then directory baz on server two, etc.

This default behavior can be modified with the fair constructor arg:

fair
@Bean
public RotatingServerAdvice advice() {
    ...
    return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

In this case, the advice will move to the next server/directory regardless of whether the previous poll returned a file.

Alternatively, you can provide your own RotationPolicy to reconfigure the message source as needed:

policy
public interface RotationPolicy {

    void beforeReceive(MessageSource<?> source);

    void afterReceive(boolean messageReceived, MessageSource<?> source);

}

and

custom
@Bean
public RotatingServerAdvice advice() {
    return new RotatingServerAdvice(myRotationPolicy());
}

The local-filename-generator-expression attribute (localFilenameGeneratorExpression on the synchronizer) can now contain the #remoteDirectory variable. This allows files retrieved from different directories to be downloaded to similar directories locally:

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(Sftp.inboundAdapter(sf())
                    .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                    .localDirectory(new File(tmpDir))
                    .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
                    .remoteDirectory("."),
                e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
            .channel(MessageChannels.queue("files"))
            .get();
}
Do not configure a TaskExecutor on the poller when using this advice; see Conditional Pollers for Message Sources for more information.

Also see a convenient AbstractRemoteFileStreamingMessageSource.clearFetchedCache() API when not all fetched files are processed withing a single polling cycle, but SessionFactory might be rotated to different one.