The WebFlux Spring Integration module (spring-integration-webflux
) allows for the execution of HTTP requests and the processing of inbound HTTP requests in Reactive manner.
The WebFlux support consists of the following gateway implementations: WebFluxInboundEndpoint
, WebFluxRequestExecutingMessageHandler
.
The implementation is fully based on the Spring WebFlux and Project Reactor foundations.
Also see Chapter 18, HTTP Support for more information since many options are shared between reactive and regular HTTP components.
Starting with version 5.0, the WebFluxInboundEndpoint
, WebHandler
, implementation is provided.
This component is similar to the MVC-based HttpRequestHandlingEndpointSupport
with which it shares some common options via the newly extracted BaseHttpInboundEndpoint
.
Instead of MVC, it is used in the Spring WebFlux Reactive environment.
A simple sample for explanation:
@Configuration @EnableWebFlux @EnableIntegration public class ReactiveHttpConfiguration { @Bean public WebFluxInboundEndpoint simpleInboundEndpoint() { WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint(); RequestMapping requestMapping = new RequestMapping(); requestMapping.setPathPatterns("/test"); endpoint.setRequestMapping(requestMapping); endpoint.setRequestChannelName("serviceChannel"); return endpoint; } @ServiceActivator(inputChannel = "serviceChannel") String service() { return "It works!"; } }
As can be seen, the configuration is similar to the HttpRequestHandlingEndpointSupport
mentioned above, except that we use @EnableWebFlux
to add the WebFlux infrastructure to our integration application.
Also, the WebFluxInboundEndpoint
performs sendAndReceive
operation to the downstream flow using back-pressure, on demand based capabilities, provided by the reactive HTTP server implementation.
Note | |
---|---|
The reply part is non-blocking as well and based on the internal |
The WebFluxInboundEndpoint
can be configured with a custom ServerCodecConfigurer
, RequestedContentTypeResolver
and even a ReactiveAdapterRegistry
.
The latter provides a mechanism where we can return a reply as any reactive type - Reactor Flux
, RxJava Observable
, Flowable
etc.
This way, we can simply implement Server Sent Events scenarios with Spring Integration components:
@Bean public IntegrationFlow sseFlow() { return IntegrationFlows .from(WebFlux.inboundGateway("/sse") .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))) .handle((p, h) -> Flux.just("foo", "bar", "baz")) .get(); }
Also see Section 18.4.3, “Request Mapping Support” and Section 18.4.4, “Cross-Origin Resource Sharing (CORS) Support” for more possible configuration options.
The WebFluxRequestExecutingMessageHandler
(starting with version 5.0) implementation is very similar to HttpRequestExecutingMessageHandler
, using a WebClient
from the Spring Framework WebFlux module.
To configure it, define a bean like this:
<bean id="httpReactiveOutbound" class="org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler"> <constructor-arg value="http://localhost:8080/example" /> <property name="outputChannel" ref="responseChannel" /> </bean>
You can configure a WebClient
instance to use:
<beans:bean id="webClient" class="org.springframework.web.reactive.function.client.WebClient" factory-method="create"/> <bean id="httpReactiveOutbound" class="org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler"> <constructor-arg value="http://localhost:8080/example" /> <constructor-arg re="webClient" /> <property name="outputChannel" ref="responseChannel" /> </bean>
The WebClient
exchange()
operation returns a Mono<ClientResponse>
which is mapped (using several Mono.map()
steps) to an AbstractIntegrationMessageBuilder
as the output from the WebFluxRequestExecutingMessageHandler
.
Together with the ReactiveChannel
as an outputChannel
, the Mono<ClientResponse>
evaluation is deferred until a downstream subscription is made.
Otherwise, it is treated as an async
mode and the Mono
response is adapted to an SettableListenableFuture
for an asynchronous reply from the WebFluxRequestExecutingMessageHandler
.
The target payload of the output message depends on the WebFluxRequestExecutingMessageHandler
configuration.
The setExpectedResponseType(Class<?>)
or setExpectedResponseTypeExpression(Expression)
identifies the target type of the response body element conversion.
If the replyPayloadToFlux
is set to true
, the response body is converted to a Flux
with the provided expectedResponseType
for each element and this Flux
is sent as the payload downstream.
A splitter afterwards can be used to iterate over this Flux
in a reactive manner.
In addition a BodyExtractor<?, ClientHttpResponse>
can be injected into the WebFluxRequestExecutingMessageHandler
instead of expectedResponseType
and replyPayloadToFlux
properties.
It can be used for low-level access to the ClientHttpResponse
and more control over body and HTTP headers conversion.
The ClientHttpResponseBodyExtractor
is provided out-of-the-box as identity function to produce downstream the whole ClientHttpResponse
and any other possible custom logic.
Also see Section 18.3, “Http Outbound Components” for more possible configuration options.
Spring Integration provides a webflux namespace and the corresponding schema definition. To include it in your configuration, simply provide the following namespace declaration in your application context configuration file:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/webflux http://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd"> ... </beans>
To configure Spring Integration WebFlux via XML you may use appropriate components from the mentioned int-webflux
namespace - inbound-channel-adapter
or inbound-gateway
according request/response requirements respectively:
<inbound-channel-adapter id="reactiveFullConfig" channel="requests" path="test1" auto-startup="false" phase="101" request-payload-type="byte[]" error-channel="errorChannel" payload-expression="payload" supported-methods="PUT" status-code-expression="'202'" header-mapper="headerMapper" codec-configurer="codecConfigurer" reactive-adapter-registry="reactiveAdapterRegistry" requested-content-type-resolver="requestedContentTypeResolver"> <request-mapping headers="foo"/> <cross-origin origin="foo" method="PUT"/> <header name="foo" expression="'foo'"/> </inbound-channel-adapter> <inbound-gateway id="reactiveFullConfig" request-channel="requests" path="test1" auto-startup="false" phase="101" request-payload-type="byte[]" error-channel="errorChannel" payload-expression="payload" supported-methods="PUT" reply-timeout-status-code-expression="'504'" header-mapper="headerMapper" codec-configurer="codecConfigurer" reactive-adapter-registry="reactiveAdapterRegistry" requested-content-type-resolver="requestedContentTypeResolver"> <request-mapping headers="foo"/> <cross-origin origin="foo" method="PUT"/> <header name="foo" expression="'foo'"/> </inbound-gateway>
If you want to execute the http request in a reactive, non-blocking way, you can use the outbound-gateway
or outbound-channel-adapter
.
<int-webflux:outbound-gateway id="reactiveExample1" request-channel="requests" url="http://localhost/test" http-method-expression="headers.httpMethod" extract-request-payload="false" expected-response-type-expression="payload" charset="UTF-8" reply-timeout="1234" reply-channel="replies"/> <int-webflux:outbound-channel-adapter id="reactiveExample2" url="http://localhost/example" http-method="GET" channel="requests" charset="UTF-8" extract-payload="false" expected-response-type="java.lang.String" order="3" auto-startup="false"/>
Inbound Gateway Using Java Configuration.
@Bean public WebFluxInboundEndpoint jsonInboundEndpoint() { WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint(); RequestMapping requestMapping = new RequestMapping(); requestMapping.setPathPatterns("/persons"); endpoint.setRequestMapping(requestMapping); endpoint.setRequestChannel(fluxResultChannel()); return endpoint; } @Bean public MessageChannel fluxResultChannel() { return new FluxMessageChannel(); } @ServiceActivator(inputChannel = "fluxResultChannel") Flux<Person> getPersons() { return Flux.just(new Person("Jane"), new Person("Jason"), new Person("John")); }
Inbound Gateway Using the Java DSL.
@Bean public IntegrationFlow inboundChannelAdapterFlow() { return IntegrationFlows .from(WebFlux.inboundChannelAdapter("/reactivePost") .requestMapping(m -> m.methods(HttpMethod.POST)) .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class)) .statusCodeFunction(m -> HttpStatus.ACCEPTED)) .channel(c -> c.queue("storeChannel")) .get(); }
Outbound Gateway Using Java Configuration.
@ServiceActivator(inputChannel = "reactiveHttpOutRequest") @Bean public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) { WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client); handler.setHttpMethod(HttpMethod.POST); handler.setExpectedResponseType(String.class); return handler; }
Outbound Gateway Using the Java DSL.
@Bean public IntegrationFlow outboundReactive() { return f -> f .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m -> UriComponentsBuilder.fromUriString("http://localhost:8080/foo") .queryParams(m.getPayload()) .build() .toUri()) .httpMethod(HttpMethod.GET) .expectedResponseType(String.class)); }
Since WebFlux components are fully based on the HTTP protocol there is no difference in the HTTP headers mapping. See Section 18.8, “HTTP Header Mappings” for more possible options and components to use for mapping headers.