WebFlux Support

The WebFlux Spring Integration module (spring-integration-webflux) allows for the execution of HTTP requests and the processing of inbound HTTP requests in a reactive manner.

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-webflux</artifactId>
    <version>5.3.6.RELEASE</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-webflux:5.3.6.RELEASE"

The io.projectreactor.netty:reactor-netty dependency must be included in case of non-Servlet-based server configuration.

The WebFlux support consists of the following gateway implementations: WebFluxInboundEndpoint and WebFluxRequestExecutingMessageHandler. The support is fully based on the Spring WebFlux and Project Reactor foundations. See HTTP Support for more information, since many options are shared between reactive and regular HTTP components.

WebFlux Inbound Components

Starting with version 5.0, the WebFluxInboundEndpoint implementation of WebHandler is provided. This component is similar to the MVC-based HttpRequestHandlingEndpointSupport, with which it shares some common options through the newly extracted BaseHttpInboundEndpoint. It is used in the Spring WebFlux reactive environment (instead of MVC). The following example shows a simple implementation of a WebFlux endpoint:

@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {

    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }

}

The configuration is similar to the HttpRequestHandlingEndpointSupport (mentioned prior to the example), except that we use @EnableWebFlux to add the WebFlux infrastructure to our integration application. Also, the WebFluxInboundEndpoint performs sendAndReceive operations to the downstream flow by using back-pressure, on-demand based capabilities, provided by the reactive HTTP server implementation.

The reply part is non-blocking as well and is based on the internal FutureReplyChannel, which is flat-mapped to a reply Mono for on-demand resolution.

You can configure the WebFluxInboundEndpoint with a custom ServerCodecConfigurer, a RequestedContentTypeResolver, and even a ReactiveAdapterRegistry. The latter provides a mechanism you can use to return a reply as any reactive type: Reactor Flux, RxJava Observable, Flowable, and others. This way, we can implement Server Sent Events scenarios with Spring Integration components, as the following example shows:

@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlows
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}

See Request Mapping Support and Cross-origin Resource Sharing (CORS) Support for more possible configuration options.

When the request body is empty or payloadExpression returns null, the request params (MultiValueMap<String, String>) is used for a payload of the target message to process.

Payload Validation

Starting with version 5.2, the WebFluxInboundEndpoint can be configured with a Validator. Unlike the MVC validation in the HTTP Support, it is used to validate elements in the Publisher to which a request has been converted by the HttpMessageReader, before performing a fallback and payloadExpression functions. The Framework can’t assume how complex the Publisher object can be after building the final payload. If there is a requirements to restrict validation visibility for exactly final payload (or its Publisher elements), the validation should go downstream instead of WebFlux endpoint. See more information in the Spring WebFlux documentation. An invalid payload is rejected with an IntegrationWebExchangeBindException (a WebExchangeBindException extension), containing all the validation Errors. See more in Spring Framework Reference Manual about validation.

WebFlux Outbound Components

The WebFluxRequestExecutingMessageHandler (starting with version 5.0) implementation is similar to HttpRequestExecutingMessageHandler. It uses a WebClient from the Spring Framework WebFlux module. To configure it, define a bean similar to the following:

<bean id="httpReactiveOutbound"
    class="org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler">
     <constructor-arg value="http://localhost:8080/example" />
     <property name="outputChannel" ref="responseChannel" />
</bean>

You can configure a WebClient instance to use, as the following example shows:

<beans:bean id="webClient" class="org.springframework.web.reactive.function.client.WebClient"
				factory-method="create"/>

<bean id="httpReactiveOutbound"
    class="org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler">
     <constructor-arg value="http://localhost:8080/example" />
     <constructor-arg re="webClient" />
     <property name="outputChannel" ref="responseChannel" />
</bean>

The WebClient exchange() operation returns a Mono<ClientResponse>, which is mapped (by using several Mono.map() steps) to an AbstractIntegrationMessageBuilder as the output from the WebFluxRequestExecutingMessageHandler. Together with the ReactiveChannel as an outputChannel, the Mono<ClientResponse> evaluation is deferred until a downstream subscription is made. Otherwise, it is treated as an async mode, and the Mono response is adapted to a SettableListenableFuture for an asynchronous reply from the WebFluxRequestExecutingMessageHandler. The target payload of the output message depends on the WebFluxRequestExecutingMessageHandler configuration. The setExpectedResponseType(Class<?>) or setExpectedResponseTypeExpression(Expression) identifies the target type of the response body element conversion. If replyPayloadToFlux is set to true, the response body is converted to a Flux with the provided expectedResponseType for each element, and this Flux is sent as the payload downstream. Afterwards, you can use a splitter to iterate over this Flux in a reactive manner.

In addition a BodyExtractor<?, ClientHttpResponse> can be injected into the WebFluxRequestExecutingMessageHandler instead of the expectedResponseType and replyPayloadToFlux properties. It can be used for low-level access to the ClientHttpResponse and more control over body and HTTP headers conversion. Spring Integration provides ClientHttpResponseBodyExtractor as a identity function to produce (downstream) the whole ClientHttpResponse and any other possible custom logic.

Starting with version 5.2, the WebFluxRequestExecutingMessageHandler supports reactive Publisher, Resource, and MultiValueMap types as the request message payload. A respective BodyInserter is used internally to be populated into the WebClient.RequestBodySpec. When the payload is a reactive Publisher, a configured publisherElementType or publisherElementTypeExpression can be used to determine a type for the publisher’s element type. The expression must be resolved to a Class<?>, String which is resolved to the target Class<?> or ParameterizedTypeReference.

See HTTP Outbound Components for more possible configuration options.

WebFlux Namespace Support

Spring Integration provides a webflux namespace and the corresponding schema definition. To include it in your configuration, add the following namespace declaration in your application context configuration file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>

Inbound

To configure Spring Integration WebFlux with XML, you need to use appropriate components from the int-webflux namespace: inbound-channel-adapter or inbound-gateway, corresponding to request and response requirements, respectively. The following example shows how to configure both an inbound channel adapter and an inbound gateway:

<inbound-channel-adapter id="reactiveFullConfig" channel="requests"
                         path="test1"
                         auto-startup="false"
                         phase="101"
                         request-payload-type="byte[]"
                         error-channel="errorChannel"
                         payload-expression="payload"
                         supported-methods="PUT"
                         status-code-expression="'202'"
                         header-mapper="headerMapper"
                         codec-configurer="codecConfigurer"
                         reactive-adapter-registry="reactiveAdapterRegistry"
                         requested-content-type-resolver="requestedContentTypeResolver">
    <request-mapping headers="foo"/>
    <cross-origin origin="foo"
                  method="PUT"/>
    <header name="foo" expression="'foo'"/>
</inbound-channel-adapter>

<inbound-gateway id="reactiveFullConfig" request-channel="requests"
                 path="test1"
                 auto-startup="false"
                 phase="101"
                 request-payload-type="byte[]"
                 error-channel="errorChannel"
                 payload-expression="payload"
                 supported-methods="PUT"
                 reply-timeout-status-code-expression="'504'"
                 header-mapper="headerMapper"
                 codec-configurer="codecConfigurer"
                 reactive-adapter-registry="reactiveAdapterRegistry"
                 requested-content-type-resolver="requestedContentTypeResolver">
    <request-mapping headers="foo"/>
    <cross-origin origin="foo"
                  method="PUT"/>
    <header name="foo" expression="'foo'"/>
</inbound-gateway>

Outbound

If you want to execute the HTTP request in a reactive, non-blocking way, you can use the outbound-gateway or outbound-channel-adapter. The following example shows how to configure both an outbound gateway and an outbound channel adapter:

<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="http://localhost/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>

<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="http://localhost/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>

Configuring WebFlux Endpoints with Java

The following example shows how to configure a WebFlux inbound endpoint with Java:

@Bean
public WebFluxInboundEndpoint jsonInboundEndpoint() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/persons");
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannel(fluxResultChannel());
    return endpoint;
}

@Bean
public MessageChannel fluxResultChannel() {
    return new FluxMessageChannel();
}

@ServiceActivator(inputChannel = "fluxResultChannel")
Flux<Person> getPersons() {
    return Flux.just(new Person("Jane"), new Person("Jason"), new Person("John"));
}

The following example shows how to configure a WebFlux inbound gateway with the Java DSL:

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlows
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}

The following example shows how to configure a WebFlux outbound gateway with Java:

@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}

The following example shows how to configure a WebFlux outbound gateway with the Java DSL:

@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}

WebFlux Header Mappings

Since WebFlux components are fully based on the HTTP protocol, there is no difference in the HTTP headers mapping. See HTTP Header Mappings for more possible options and components to use for mapping headers.