The reactive()
Endpoint
Starting with version 5.5, the ConsumerEndpointSpec
provides a reactive()
configuration property with an optional customizer Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
.
This option configures the target endpoint as a ReactiveStreamsConsumer
instance, independently of the input channel type, which is converted to a Flux
via IntegrationReactiveUtils.messageChannelToFlux()
.
The provided function is used from the Flux.transform()
operator to customize (publishOn()
, log()
, doOnNext()
etc.) a reactive stream source from the input channel.
The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that DirectChannel
:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
See Reactive Streams Support for more information.