33. WebFlux Support

33.1 Introduction

The WebFlux Spring Integration module (spring-integration-webflux) allows for the execution of HTTP requests and the processing of inbound HTTP requests in Reactive manner. The WebFlux support consists of the following gateway implementations: WebFluxInboundEndpoint, WebFluxRequestExecutingMessageHandler. The implementation is fully based on the Spring WebFlux and Project Reactor foundations. Also see Chapter 17, HTTP Support for more information since many options are shared between reactive and regular HTTP components.

33.2 WebFlux Inbound Components

Starting with version 5.0, the WebFluxInboundEndpoint, WebHandler, implementation is provided. This component is similar to the MVC-based HttpRequestHandlingEndpointSupport with which it shares some common options via the newly extracted BaseHttpInboundEndpoint. Instead of MVC, it is used in the Spring WebFlux Reactive environment. A simple sample for explanation:

@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {

    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }

}

As can be seen, the configuration is similar to the HttpRequestHandlingEndpointSupport mentioned above, except that we use @EnableWebFlux to add the WebFlux infrastructure to our integration application. Also, the WebFluxInboundEndpoint performs sendAndReceive operation to the downstream flow using back-pressure, on demand based capabilities, provided by the reactive HTTP server implementation.

[Note]Note

The reply part is non-blocking as well and based on the internal FutureReplyChannel which is flat-mapped to a reply Mono for on demand resolution.

The WebFluxInboundEndpoint can be configured with a custom ServerCodecConfigurer, RequestedContentTypeResolver and even a ReactiveAdapterRegistry. The latter provides a mechanism where we can return a reply as any reactive type - Reactor Flux, RxJava Observable, Flowable etc. This way, we can simply implement Server Sent Events scenarios with Spring Integration components:

@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlows
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}

Also see Section 17.4.3, “Request Mapping Support” and Section 17.4.4, “Cross-Origin Resource Sharing (CORS) Support” for more possible configuration options.

33.3 WebFlux Outbound Components

The WebFluxRequestExecutingMessageHandler (starting with version 5.0) implementation is very similar to HttpRequestExecutingMessageHandler, using a WebClient from the Spring Framework WebFlux module. To configure it, define a bean like this:

<bean id="httpReactiveOutbound"
    class="org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler">
     <constructor-arg value="http://localhost:8080/example" />
     <property name="outputChannel" ref="responseChannel" />
</bean>

You can configure a WebClient instance to use:

<beans:bean id="webClient" class="org.springframework.web.reactive.function.client.WebClient"
				factory-method="create"/>

<bean id="httpReactiveOutbound"
    class="org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler">
     <constructor-arg value="http://localhost:8080/example" />
     <constructor-arg re="webClient" />
     <property name="outputChannel" ref="responseChannel" />
</bean>

The WebClient exchange() operation returns a Mono<ClientResponse> which is mapped to the AbstractIntegrationMessageBuilder reactive support (using Mono.map()) as the output from the WebFluxRequestExecutingMessageHandler. Together with the ReactiveChannel as an outputChannel, the Mono<ClientResponse> evaluation is deferred until a downstream subscription is made. Otherwise, it is treated as an async mode and the Mono response is adapted to an SettableListenableFuture for an asynchronous reply from the WebFluxRequestExecutingMessageHandler.

Also see Section 17.3, “Http Outbound Components” for more possible configuration options.

33.4 WebFlux Namespace Support

33.4.1 Introduction

Spring Integration provides a webflux namespace and the corresponding schema definition. To include it in your configuration, simply provide the following namespace declaration in your application context configuration file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    http://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>

33.4.2 Inbound

33.4.3 Outbound

If you want to execute the http request in a reactive, non-blocking way, you can use the outbound-gateway or outbound-channel-adapter.

<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="http://localhost/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>

<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="http://localhost/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>

33.5 Configuring WebFlux Endpoints with Java

Inbound Gateway Using Java Configuration. 

@Bean
public WebFluxInboundEndpoint jsonInboundEndpoint() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/persons");
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannel(fluxResultChannel());
    return endpoint;
}

@Bean
public MessageChannel fluxResultChannel() {
    return new FluxMessageChannel();
}

@ServiceActivator(inputChannel = "fluxResultChannel")
Flux<Person> getPersons() {
    return Flux.just(new Person("Jane"), new Person("Jason"), new Person("John"));
}

Inbound Gateway Using the Java DSL. 

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlows
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}

Outbound Gateway Using Java Configuration. 

@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}

Outbound Gateway Using the Java DSL. 

@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}

33.6 WebFlux Header Mappings

Since WebFlux components are fully based on the HTTP protocol there is no difference in the HTTP headers mapping. See Section 17.8, “HTTP Header Mappings” for more possible options and components to use for mapping headers.