R2DBC Support
Spring Integration provides channel adapters for receiving and sending messages by using reactive access to databases via R2DBC drivers.
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-r2dbc</artifactId>
<version>5.5.20</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:5.5.20"
R2DBC Inbound Channel Adapter
The R2dbcMessageSource
is a pollable MessageSource
implementation based on the R2dbcEntityOperations
and produces messages with a Flux
or Mono
as a payload for data fetched from database according an expectSingleResult
option.
The query to SELECT
can be statically provided or based on a SpEL expression which is evaluated on every receive()
call.
The R2dbcMessageSource.SelectCreator
is present as a root object for evaluation context to allow to use a StatementMapper.SelectSpec
fluent API.
By default this channel adapter maps records from the select into a LinkedCaseInsensitiveMap
instances.
It can be customized providing a payloadType
options which is used underneath by the EntityRowMapper
based on the this.r2dbcEntityOperations.getConverter()
.
The updateSql
is optional and used to mark read records in the databased for skipping from the subsequent polls.
The UPDATE
operation can be supplied with a BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>
to bind values into an UPDATE
based on records in the SELECT
result.
A typical configuration for this channel adapter might look like this:
@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
"SELECT * FROM person WHERE name='Name'");
r2dbcMessageSource.setPayloadType(Person.class);
r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
r2dbcMessageSource.setBindFunction(
(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
return r2dbcMessageSource;
}
With Java DSL a configuration for this channel adapter is like this:
@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlows
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
(selectCreator) ->
selectCreator.createSelect("person")
.withProjection("*")
.withCriteria(Criteria.where("id").is(1)))
.expectSingleResult(true)
.payloadType(Person.class)
.updateSql("UPDATE Person SET id='2' where id = :id")
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)))
.<Mono<?>>handle((p, h) -> p, e -> e.async(true))
.channel(MessageChannels.flux())
.get();
}
R2DBC Outbound Channel Adapter
The R2dbcMessageHandler
is a ReactiveMessageHandler
implementation to perform an INSERT
(default), UPDATE
or DELETE
query in database using a provided R2dbcEntityOperations
.
The R2dbcMessageHandler.Type
can be configured statically or via a SpEL expression against request message.
The query to execute can be based on the tableName
, values
and criteria
expression options or (if tableName
is not provided) the whole message payload is treated as an org.springframework.data.relational.core.mapping.Table
entity to perform SQL against.
The package org.springframework.data.relational.core.query
is registered as an import into a SpEL evaluation context for direct access to the Criteria
fluent API which is used for UPDATE
and DELETE
queries.
The valuesExpression
is used in the INSERT
and UPDATE
and must be evaluated to the Map
for column-value pairs to perform a change in the target table against request message.
A typical configuration for this channel adapter might look like this:
@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
messageHandler.setCriteriaExpression(
EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
return messageHandler;
}
With Java DSL a configuration for this channel adapter is like this:
.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
.values("{age:36}"))