R2DBC Support

Spring Integration provides channel adapters for receiving and sending messages by using reactive access to databases via R2DBC drivers.

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>5.5.20</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-r2dbc:5.5.20"

R2DBC Inbound Channel Adapter

The R2dbcMessageSource is a pollable MessageSource implementation based on the R2dbcEntityOperations and produces messages with a Flux or Mono as a payload for data fetched from database according an expectSingleResult option. The query to SELECT can be statically provided or based on a SpEL expression which is evaluated on every receive() call. The R2dbcMessageSource.SelectCreator is present as a root object for evaluation context to allow to use a StatementMapper.SelectSpec fluent API. By default this channel adapter maps records from the select into a LinkedCaseInsensitiveMap instances. It can be customized providing a payloadType options which is used underneath by the EntityRowMapper based on the this.r2dbcEntityOperations.getConverter(). The updateSql is optional and used to mark read records in the databased for skipping from the subsequent polls. The UPDATE operation can be supplied with a BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec> to bind values into an UPDATE based on records in the SELECT result.

A typical configuration for this channel adapter might look like this:

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

With Java DSL a configuration for this channel adapter is like this:

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlows
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .<Mono<?>>handle((p, h) -> p, e -> e.async(true))
        .channel(MessageChannels.flux())
        .get();
}

R2DBC Outbound Channel Adapter

The R2dbcMessageHandler is a ReactiveMessageHandler implementation to perform an INSERT (default), UPDATE or DELETE query in database using a provided R2dbcEntityOperations. The R2dbcMessageHandler.Type can be configured statically or via a SpEL expression against request message. The query to execute can be based on the tableName, values and criteria expression options or (if tableName is not provided) the whole message payload is treated as an org.springframework.data.relational.core.mapping.Table entity to perform SQL against. The package org.springframework.data.relational.core.query is registered as an import into a SpEL evaluation context for direct access to the Criteria fluent API which is used for UPDATE and DELETE queries. The valuesExpression is used in the INSERT and UPDATE and must be evaluated to the Map for column-value pairs to perform a change in the target table against request message.

A typical configuration for this channel adapter might look like this:

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

With Java DSL a configuration for this channel adapter is like this:

.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))