Inbound Channel Adapters: Polling Multiple Servers and Directories
Starting with version 5.0.7, the RotatingServerAdvice
is available; when configured as a poller advice, the inbound adapters can poll multiple servers and directories.
Configure the advice and add it to the poller’s advice chain as normal.
A DelegatingSessionFactory
is used to select the server see Delegating Session Factory for more information.
The advice configuration consists of a list of RotationPolicy.KeyDirectory
objects.
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
This advice will poll directory foo
on server one
until no new files exist then move to directory bar
and then directory baz
on server two
, etc.
This default behavior can be modified with the fair
constructor arg:
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
In this case, the advice will move to the next server/directory regardless of whether the previous poll returned a file.
Alternatively, you can provide your own RotationPolicy
to reconfigure the message source as needed:
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
and
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
The local-filename-generator-expression
attribute (localFilenameGeneratorExpression
on the synchronizer) can now contain the #remoteDirectory
variable.
This allows files retrieved from different directories to be downloaded to similar directories locally:
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
Do not configure a TaskExecutor on the poller when using this advice; see Conditional Pollers for Message Sources for more information.
|