The reactive() Endpoint

Starting with version 5.5, the ConsumerEndpointSpec provides a reactive() configuration property with an optional customizer Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>. This option configures the target endpoint as a ReactiveStreamsConsumer instance, independently of the input channel type, which is converted to a Flux via IntegrationReactiveUtils.messageChannelToFlux(). The provided function is used from the Flux.transform() operator to customize (publishOn(), log(), doOnNext() etc.) a reactive stream source from the input channel.

The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that DirectChannel:

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}

See Reactive Streams Support for more information.