WebFlux Support
The WebFlux Spring Integration module (spring-integration-webflux
) allows for the execution of HTTP requests and the processing of inbound HTTP requests in a reactive manner.
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.3.5"
The io.projectreactor.netty:reactor-netty
dependency must be included in case of non-Servlet-based server configuration.
The WebFlux support consists of the following gateway implementations: WebFluxInboundEndpoint
and WebFluxRequestExecutingMessageHandler
.
The support is fully based on the Spring WebFlux and Project Reactor foundations.
See HTTP Support for more information, since many options are shared between reactive and regular HTTP components.
WebFlux Namespace Support
Spring Integration provides a webflux
namespace and the corresponding schema definition.
To include it in your configuration, add the following namespace declaration in your application context configuration file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux Inbound Components
Starting with version 5.0, the WebFluxInboundEndpoint
implementation of WebHandler
is provided.
This component is similar to the MVC-based HttpRequestHandlingEndpointSupport
, with which it shares some common options through the newly extracted BaseHttpInboundEndpoint
.
It is used in the Spring WebFlux reactive environment (instead of MVC).
The following example shows a simple implementation of a WebFlux endpoint:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
The configuration is similar to the HttpRequestHandlingEndpointSupport
(mentioned prior to the example), except that we use @EnableWebFlux
to add the WebFlux infrastructure to our integration application.
Also, the WebFluxInboundEndpoint
performs sendAndReceive
operations to the downstream flow by using back-pressure, on-demand based capabilities, provided by the reactive HTTP server implementation.
The reply part is non-blocking as well and is based on the internal FutureReplyChannel , which is flat-mapped to a reply Mono for on-demand resolution.
|
You can configure the WebFluxInboundEndpoint
with a custom ServerCodecConfigurer
, a RequestedContentTypeResolver
, and even a ReactiveAdapterRegistry
.
The latter provides a mechanism you can use to return a reply as any reactive type: Reactor Flux
, RxJava Observable
, Flowable
, and others.
This way, we can implement Server Sent Events scenarios with Spring Integration components, as the following example shows:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
See Request Mapping Support and Cross-origin Resource Sharing (CORS) Support for more possible configuration options.
When the request body is empty or payloadExpression
returns null
, the request params (MultiValueMap<String, String>
) is used for a payload
of the target message to process.
Payload Validation
Starting with version 5.2, the WebFluxInboundEndpoint
can be configured with a Validator
.
Unlike the MVC validation in the HTTP Support, it is used to validate elements in the Publisher
to which a request has been converted by the HttpMessageReader
, before performing a fallback and payloadExpression
functions.
The Framework can’t assume how complex the Publisher
object can be after building the final payload.
If there is a requirements to restrict validation visibility for exactly final payload (or its Publisher
elements), the validation should go downstream instead of WebFlux endpoint.
See more information in the Spring WebFlux documentation.
An invalid payload is rejected with an IntegrationWebExchangeBindException
(a WebExchangeBindException
extension), containing all the validation Errors
.
See more in Spring Framework Reference Manual about validation.
WebFlux Outbound Components
The WebFluxRequestExecutingMessageHandler
(starting with version 5.0) implementation is similar to HttpRequestExecutingMessageHandler
.
It uses a WebClient
from the Spring Framework WebFlux module.
To configure it, define a bean similar to the following:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="http://localhost/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="http://localhost/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
The WebClient
exchange()
operation returns a Mono<ClientResponse>
, which is mapped (by using several Mono.map()
steps) to an AbstractIntegrationMessageBuilder
as the output from the WebFluxRequestExecutingMessageHandler
.
Together with the ReactiveChannel
as an outputChannel
, the Mono<ClientResponse>
evaluation is deferred until a downstream subscription is made.
Otherwise, it is treated as an async
mode, and the Mono
response is adapted to a SettableListenableFuture
for an asynchronous reply from the WebFluxRequestExecutingMessageHandler
.
The target payload of the output message depends on the WebFluxRequestExecutingMessageHandler
configuration.
The setExpectedResponseType(Class<?>)
or setExpectedResponseTypeExpression(Expression)
identifies the target type of the response body element conversion.
If replyPayloadToFlux
is set to true
, the response body is converted to a Flux
with the provided expectedResponseType
for each element, and this Flux
is sent as the payload downstream.
Afterward, you can use a splitter to iterate over this Flux
in a reactive manner.
In addition, a BodyExtractor<?, ClientHttpResponse>
can be injected into the WebFluxRequestExecutingMessageHandler
instead of the expectedResponseType
and replyPayloadToFlux
properties.
It can be used for low-level access to the ClientHttpResponse
and more control over body and HTTP headers conversion.
Spring Integration provides ClientHttpResponseBodyExtractor
as a identity function to produce (downstream) the whole ClientHttpResponse
and any other possible custom logic.
Starting with version 5.2, the WebFluxRequestExecutingMessageHandler
supports reactive Publisher
, Resource
, and MultiValueMap
types as the request message payload.
A respective BodyInserter
is used internally to be populated into the WebClient.RequestBodySpec
.
When the payload is a reactive Publisher
, a configured publisherElementType
or publisherElementTypeExpression
can be used to determine a type for the publisher’s element type.
The expression must be resolved to a Class<?>
, String
which is resolved to the target Class<?>
or ParameterizedTypeReference
.
Starting with version 5.5, the WebFluxRequestExecutingMessageHandler
exposes an extractResponseBody
flag (which is true
by default) to return just the response body, or to return the whole ResponseEntity
as the reply message payload, independently of the provided expectedResponseType
or replyPayloadToFlux
.
If a body is not present in the ResponseEntity
, this flag is ignored and the whole ResponseEntity
is returned.
See HTTP Outbound Components for more possible configuration options.
WebFlux Header Mappings
Since WebFlux components are fully based on the HTTP protocol, there is no difference in the HTTP headers mapping. See HTTP Header Mappings for more possible options and components to use for mapping headers.
WebFlux Request Attributes
Starting with version 6.0, the WebFluxRequestExecutingMessageHandler
can be configured to evaluate request attributes via setAttributeVariablesExpression()
.
This SpEL expression must be evaluated in Map
.
Such a map is then propagated to the WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
HTTP request configuration callback.
This will be helpful if an information in a form of key-value object needs to be passed from Message
to request and downstream filter will get access to these attributes for further processing.