Spring Integration provides support for Reactive Streams interaction in some places of the framework and from different aspects. We will discuss most of them here with appropriate links to the target chapters for details whenever necessary.
To recap, Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns.
Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters.
Spring Integration’s primary goal is to provide a simple model for building enterprise integration solutions while maintaining the separation of concerns that is essential for producing maintainable, testable code.
This goal is achieved in the target application using first class citizens like
endpoint, which allow us to build an integration flow (pipeline), where (in most cases) one endpoint produces messages into a channel to be consumed by another endpoint.
This way we distinguish an integration interaction model from the target business logic.
The crucial part here is a channel in between: the flow behavior depends from its implementation leaving endpoints untouched.
On the other hand, the Reactive Streams is a standard for asynchronous stream processing with non-blocking back pressure.
The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – like passing elements on to another thread or thread-pool – while ensuring that the receiving side is not forced to buffer arbitrary amounts of data.
In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded.
The intention of Reactive Streams implementation, such as Project Reactor, is to preserve these benefits and characteristics across the whole processing graph of a stream application.
The ultimate goal of Reactive Streams libraries is to provide types, set of operators and supporting API for a target application in a transparent and smooth manner as is possible with available programming language structure, but the final solution is not as imperative as it is with a normal function chain invocation.
It is divided into to phases: definition and execution, which happens some time later during subscription to the final reactive publisher, and demand for data is pushed from the bottom of the definition to the top applying back-pressure as needed - we request as many events as we can handle at the moment.
The reactive application looks like a
"stream" or as we got used to in Spring Integration terms -
In fact the Reactive Streams SPI since Java 9 is presented in the
From here it may look like Spring Integration flows are really a good fit for writing Reactive Streams applications when we apply some reactive framework operators on endpoints, but in fact the problems is much broader and we need to keep in mind that not all endpoints (e.g.
JdbcMessageHandler) can be processed in a reactive stream transparently.
Of course, the main goal for Reactive Streams support in Spring Integration is to allow the whole process to be fully reactive, on demand initiated and back-pressure ready.
It is not going to be possible until the target protocols and systems for channel adapters provide a Reactive Streams interaction model.
In the sections below we will describe what components and approaches are provided in Spring Integration for developing reactive application preserving integration flow structures.
All the Reactive Streams interaction in Spring Integration implemented with Project Reactor types, such as
The simplest point of interaction with Reactive Streams is a
@MessagingGateway where we just make a return type of the gateway method as a
Mono<?> - and the whole integration flow behind a gateway method call is going to be performed when a subscription happens on the returned
Mono for more information.
Mono-reply approach is used in the framework internally for inbound gateways which are fully based on Reactive Streams compatible protocols (see Reactive Channel Adapters below for more information).
The send-and-receive operation is wrapped into a
Mono.deffer() with chaining a reply evaluation from the
replyChannel header whenever it is available.
This way an inbound component for the particular reactive protocol (e.g. Netty) is going to be as a subscriber and initiator for a reactive flow performed on the Spring Integration.
If the request payload is a reactive type, it would be better to handle it withing a reactive stream definition deferring a process to the initiator subscription.
For this purpose a handler method must return a reactive type as well.
See the next section for more information.
When a reply producing
MessageHandler returns a reactive type payload for a reply message, it is processed in an asynchronous manner with a regular
MessageChannel implementation provided for the
outputChannel and flattened with on demand subscription when the output channel is a
ReactiveStreamsSubscribableChannel implementation, e.g.
With a standard imperative
MessageChannel use-case, and if a reply payload is a multi-value publisher (see
ReactiveAdapter.isMultiValue() for more information), it is wrapped into a
A result of this, the
Mono has to be subscribed explicitly downstream or flattened by the
ReactiveStreamsSubscribableChannel for the
outputChannel, there is no need to be concerned about return type and subscription; everything is processed smoothly by the framework internally.
See Asynchronous Service Activator for more information.
FluxMessageChannel is a combined implementation of
Flux, as a hot source, is created internally for sinking incoming messages from the
Publisher.subscribe() implementation is delegated to that internal
Also, for on demand upstream consumption, the
FluxMessageChannel provides an implementation for the
Publisher (see Source Polling Channel Adapter and splitter below, for example) provided for this channel is auto-subscribed when subscription is ready for this channel.
Events from this delegating publishers are sunk into an internal
Flux mentioned above.
A consumer for the
FluxMessageChannel must be a
org.reactivestreams.Subscriber instance for honoring the Reactive Streams contract.
Fortunately, all of the
MessageHandler implementations in Spring Integration also implement a
CoreSubscriber from project Reactor.
And thanks to a
ReactiveStreamsConsumer implementation in between, the whole integration flow configuration is left transparent for target developers.
In this case, the flow behavior is changed from an imperative push model to a reactive pull model.
ReactiveStreamsConsumer can also be used to turn any
MessageChannel into a reactive source using
IntegrationReactiveUtils, making an integration flow partially reactive.
FluxMessageChannel for more information.
Starting with version 5.5, the
ConsumerEndpointSpec introduces a
reactive() option to make the endpoint in the flow as a
ReactiveStreamsConsumer independently of the input channel.
Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> can be provided to customise a source
Flux from the input channel via
Flux.transform() operation, e.g. with the
This functionality is represented as a
@Reactive sub-annotation for all the messaging annotation (
@Splitter etc.) via their
SourcePollingChannelAdapter relies on the task which is initiated by the
A polling trigger is built from the provided options and used for periodic scheduling a task to poll a target source of data or events.
outputChannel is a
ReactiveStreamsSubscribableChannel, the same
Trigger is used to determine the next time for execution, but instead of scheduling tasks, the
SourcePollingChannelAdapter creates a
Flux<Message<?>> based on the
Flux.generate() for the
nextExecutionTime values and
Mono.delay() for a duration from the previous step.
Flux.flatMapMany() is used then to poll
maxMessagesPerPoll and sink them into an output
Flux is subscribed by the provided
ReactiveStreamsSubscribableChannel honoring a back-pressure downstream.
Starting with version 5.5, when
maxMessagesPerPoll == 0, the source is not called at all, and
flatMapMany() is completed immediately via a
Mono.empty() result until the
maxMessagesPerPoll is changed to non-zero value at a later time, e.g. via a Control Bus.
This way, any
MessageSource implementation can be turned into a reactive hot source.
See Polling Consumer for more information.
MessageProducerSupport is the base class for event-driven channel adapters and, typically, its
sendMessage(Message<?>) is used as a listener callback in the producing driver API.
This callback can also be easily plugged into the
doOnNext() Reactor operator when a message producer implementation builds a
Flux of messages instead of listener-based functionality.
In fact, this is done in the framework when an
outputChannel of the message producer is not a
However, for improved end-user experience, and to allow more back-pressure ready functionality, the
MessageProducerSupport provides a
subscribeToPublisher(Publisher<? extends Message<?>>) API to be used in the target implementation when a
Publisher<Message<?>>> is the source of data from the target system.
Typically, it is used from the
doStart() implementation when target driver API is called for a
Publisher of source data.
It is recommended to combine a reactive
MessageProducerSupport implementation with a
FluxMessageChannel as the
outputChannel for on-demand subscription and event consumption downstream.
The channel adapter goes to a stopped state when a subscription to the
Publisher is cancelled.
stop() on such a channel adapter completes the producing from the source
The channel adapter can be restarted with automatic subscription to a newly created source
Starting with version 5.3, a
ReactiveMessageSourceProducer is provided.
It is a combination of a provided
MessageSource and event-driven production into the configured
Internally it wraps a
MessageSource into the repeatedly resubscribed
Mono producing a
Flux<Message<?>> to be subscribed in the
subscribeToPublisher(Publisher<? extends Message<?>>) mentioned above.
The subscription for this
Mono is done using
Schedulers.boundedElastic() to avoid possible blocking in the target
When the message source returns
null (no data to pull), the
Mono is turned into a
repeatWhenEmpty() state with a
delay for a subsequent re-subscription based on a
Duration entry from the subscriber context.
By default it is 1 second.
MessageSource produces messages with a
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK information in the headers, it is acknowledged (if necessary) in the
doOnSuccess() of the original
Mono and rejected in the
doOnError() if the downstream flow throws a
MessagingException with the failed message to reject.
ReactiveMessageSourceProducer could be used for any use-case when a a polling channel adapter’s features should be turned into a reactive, on demand solution for any existing
AbstractMessageSplitter gets a
Publisher for its logic, the process goes naturally over the items in the
Publisher to map them into messages for sending to the
If this channel is a
Flux wrapper for the
Publisher is subscribed on demand from that channel and this splitter behavior looks more like a
flatMap Reactor operator, when we map an incoming event into multi-value output
It makes most sense when the whole integration flow is built with a
FluxMessageChannel before and after the splitter, aligning Spring Integration configuration with a Reactive Streams requirements and its operators for event processing.
With a regular channel, a
Publisher is converted into an
Iterable for standard iterate-and-produce splitting logic.
FluxAggregatorMessageHandler is another sample of specific Reactive Streams logic implementation which could be treated as a
"reactive operator" in terms of Project Reactor.
It is based on the
The incoming messages are sunk into a
Flux.create() initiated when a
FluxAggregatorMessageHandler is created, making it as a hot source.
Flux is subscribed to by a
ReactiveStreamsSubscribableChannel on demand, or directly in the
FluxAggregatorMessageHandler.start() when the
outputChannel is not reactive.
MessageHandler has its power, when the whole integration flow is built with a
FluxMessageChannel before and after this component, making the whole logic back-pressure ready.
IntegrationFlow in Java DSL can start from any
Publisher instance (see
Also, with an
IntegrationFlowBuilder.toReactivePublisher() operator, the
IntegrationFlow can be turned into a reactive hot source.
FluxMessageChannel is used internally in both cases; it can subscribe to an inbound
Publisher according to its
ReactiveStreamsSubscribableChannel contract and it is a
Publisher<Message<?>> by itself for downstream subscribers.
With a dynamic
IntegrationFlow registration we can implement a powerful logic combining Reactive Streams with this integration flow bridging to/from
Starting with version 5.5.6, a
toReactivePublisher(boolean autoStartOnSubscribe) operator variant is present to control a lifecycle of the whole
IntegrationFlow behind the returned
Typically, the subscription and consumption from the reactive publisher happens in the later runtime phase, not during reactive stream composition, or even
To avoid boilerplate code for lifecycle management of the
IntegrationFlow at the
Publisher<Message<?>> subscription point and for better end-user experience, this new operator with the
autoStartOnSubscribe flag has been introduced.
It marks (if
IntegrationFlow and its components for
autoStartup = false, so an
ApplicationContext won’t initiate production and consumption of messages in the flow automatically.
start() for the
IntegrationFlow is initiated from the internal
Independently of the
autoStartOnSubscribe value, the flow is stopped from a
Flux.doOnTerminate() - it does not make sense to produce messages if there is nothing to consume them.
For the exact opposite use-case, when
IntegrationFlow should call a reactive stream and continue after completion, a
fluxTransform() operator is provided in the
The flow at this point is turned into a
FluxMessageChannel which is propagated into a provided
fluxFunction, performed in the
A result of the function is wrapped into a
Mono<Message<?>> for flat-mapping into an output
Flux which is subscribed by another
FluxMessageChannel for downstream flow.
See Java DSL Chapter for more information.
Starting with version 5.3, the
ReactiveMessageHandler is supported natively in the framework.
This type of message handler is designed for reactive clients which return a reactive type for on-demand subscription for low-level operation execution and doesn’t provide any reply data to continue a reactive stream composition.
ReactiveMessageHandler is used in the imperative integration flow, the
handleMessage() result in subscribed immediately after return, just because there is no reactive streams composition in such a flow to honor back-pressure.
In this case the framework wraps this
ReactiveMessageHandler into a
ReactiveMessageHandlerAdapter - a plain implementation of
However when a
ReactiveStreamsConsumer is involved in the flow (e.g. when channel to consume is a
FluxMessageChannel), such a
ReactiveMessageHandler is composed to the whole reactive stream with a
flatMap() Reactor operator to honor back-pressure during consumption.
One of the out-of-the-box
ReactiveMessageHandler implementation is a
ReactiveMongoDbStoringMessageHandler for Outbound Channel Adapter.
See MongoDB Reactive Channel Adapters for more information.
When the target protocol for integration provides a Reactive Streams solution, it becomes straightforward to implement channel adapters in Spring Integration.
An inbound, event-driven channel adapter implementation is about wrapping a request (if necessary) into a deferred
Flux and perform a send (and produce reply, if any) only when a protocol component initiates a subscription into a
Mono returned from the listener method.
This way we have a reactive stream solution encapsulated exactly in this component.
Of course, downstream integration flow subscribed on the output channel should honor Reactive Streams specification and be performed in the on demand, back-pressure ready manner.
This is not always available by the nature (or the current implementation) of
MessageHandler processor used in the integration flow.
This limitation can be handled using thread pools and queues or
FluxMessageChannel (see above) before and after integration endpoints when there is no reactive implementation.
A reactive outbound channel adapter implementation is about initiation (or continuation) of a reactive stream to interaction with an external system according provided reactive API for the target protocol. An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of reactive stream on top. A returned reactive type can be subscribed immediately if we are in one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.
Currently Spring Integration provides channel adapter (or gateway) implementations for WebFlux, RSocket, MongoDb and R2DBC.
The Redis Stream Channel Adapters are also reactive and uses
ReactiveStreamOperations from Spring Data.
Also an Apache Cassandra Extension provides a
MessageHandler implementation for the Cassandra reactive driver.
More reactive channel adapters are coming, for example for Apache Kafka in Kafka based on the
ReactiveKafkaConsumerTemplate from Spring for Apache Kafka etc.
For many other non-reactive channel adapters thread pools are recommended to avoid blocking during reactive stream processing.