Dynamic and Runtime Integration Flows
IntegrationFlow
and all its dependent components can be registered at runtime.
Before version 5.0, we used the BeanFactory.registerSingleton()
hook.
Starting in the Spring Framework 5.0
, we use the instanceSupplier
hook for programmatic BeanDefinition
registration.
The following example shows how to programmatically register a bean:
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
Note that, in the preceding example, the instanceSupplier
hook is the last parameter to the genericBeanDefinition
method, provided by a lambda in this case.
All the necessary bean initialization and lifecycle is done automatically, as it is with the standard context configuration bean definitions.
To simplify the development experience, Spring Integration introduced IntegrationFlowContext
to register and manage IntegrationFlow
instances at runtime, as the following example shows:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
This is useful when we have multiple configuration options and have to create several instances of similar flows.
To do so, we can iterate our options and create and register IntegrationFlow
instances within a loop.
Another variant is when our source of data is not Spring-based, so we must create it on the fly.
Such a sample is Reactive Streams event source, as the following example shows:
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
The IntegrationFlowRegistrationBuilder
(as a result of the IntegrationFlowContext.registration()
) can be used to specify a bean name for the IntegrationFlow
to register, to control its autoStartup
, and to register, non-Spring Integration beans.
Usually, those additional beans are connection factories (AMQP, JMS, (S)FTP, TCP/UDP, and others.), serializers and deserializers, or any other required support components.
You can use the IntegrationFlowRegistration.destroy()
callback to remove a dynamically registered IntegrationFlow
and all its dependent beans when you no longer need them.
See the IntegrationFlowContext
Javadoc for more information.
Starting with version 5.0.6, all generated bean names in an IntegrationFlow definition are prepended with the flow ID as a prefix.
We recommend always specifying an explicit flow ID.
Otherwise, a synchronization barrier is initiated in the IntegrationFlowContext , to generate the bean name for the IntegrationFlow and register its beans.
We synchronize on these two operations to avoid a race condition when the same generated bean name may be used for different IntegrationFlow instances.
|
Also, starting with version 5.0.6, the registration builder API has a new method: useFlowIdAsPrefix()
.
This is useful if you wish to declare multiple instances of the same flow and avoid bean name collisions when components in the flows have the same ID, as the following example shows:
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
In this case, the message handler for the first flow can be referenced with bean a name of tcp1.client.handler
.
An id attribute is required when you usE useFlowIdAsPrefix() .
|