Dynamic and Runtime Integration Flows

IntegrationFlow and all its dependent components can be registered at runtime. Before version 5.0, we used the BeanFactory.registerSingleton() hook. Starting in the Spring Framework 5.0, we use the instanceSupplier hook for programmatic BeanDefinition registration. The following example shows how to programmatically register a bean:

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

Note that, in the preceding example, the instanceSupplier hook is the last parameter to the genericBeanDefinition method, provided by a lambda in this case.

All the necessary bean initialization and lifecycle is done automatically, as it is with the standard context configuration bean definitions.

To simplify the development experience, Spring Integration introduced IntegrationFlowContext to register and manage IntegrationFlow instances at runtime, as the following example shows:

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

This is useful when we have multiple configuration options and have to create several instances of similar flows. To do so, we can iterate our options and create and register IntegrationFlow instances within a loop. Another variant is when our source of data is not Spring-based, so we must create it on the fly. Such a sample is Reactive Streams event source, as the following example shows:

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlow.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

The IntegrationFlowRegistrationBuilder (as a result of the IntegrationFlowContext.registration()) can be used to specify a bean name for the IntegrationFlow to register, to control its autoStartup, and to register, non-Spring Integration beans. Usually, those additional beans are connection factories (AMQP, JMS, (S)FTP, TCP/UDP, and others.), serializers and deserializers, or any other required support components.

You can use the IntegrationFlowRegistration.destroy() callback to remove a dynamically registered IntegrationFlow and all its dependent beans when you no longer need them. See the IntegrationFlowContext Javadoc for more information.

Starting with version 5.0.6, all generated bean names in an IntegrationFlow definition are prepended with the flow ID as a prefix. We recommend always specifying an explicit flow ID. Otherwise, a synchronization barrier is initiated in the IntegrationFlowContext, to generate the bean name for the IntegrationFlow and register its beans. We synchronize on these two operations to avoid a race condition when the same generated bean name may be used for different IntegrationFlow instances.

Also, starting with version 5.0.6, the registration builder API has a new method: useFlowIdAsPrefix(). This is useful if you wish to declare multiple instances of the same flow and avoid bean name collisions when components in the flows have the same ID, as the following example shows:

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

In this case, the message handler for the first flow can be referenced with bean a name of tcp1.client.handler.

An id attribute is required when you usE useFlowIdAsPrefix().