Apache Cassandra Support
Spring Integration provides channel adapters (starting with version 6.0) for performing database operations against an Apache Cassandra cluster. It is fully based on the Spring Data for Apache Cassandra project.
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-cassandra</artifactId>
<version>6.2.0-M1</version>
</dependency>
compile "org.springframework.integration:spring-integration-cassandra:6.2.0-M1"
Cassandra Outbound Components
The CassandraMessageHandler
is an AbstractReplyProducingMessageHandler
implementation and can work in both one-way (default) and request-reply modes (a producesReply
option).
It is asynchronous by default (setAsync(false)
to reset) and performs reactive INSERT
, UPDATE
, DELETE
or STATEMENT
operations against the provided ReactiveCassandraOperations
.
The type of operation can be configured via the CassandraMessageHandler.Type
option.
The ingestQuery
sets the mode into an INSERT
; the query
or statementExpression
, or statementProcessor
sets the mode into a STATEMENT
.
The following code snippets demonstrate various configuration for this channel adapter or gateway:
@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
return flow -> flow
.handle(Cassandra.outboundGateway(cassandraOperations)
.query("SELECT * FROM book WHERE author = :author limit :size")
.parameter("author", "payload")
.parameter("size", m -> m.getHeaders().get("limit")))
.channel(c -> c.flux("resultChannel"));
}
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
integrationFlow {
handle(
Cassandra.outboundChannelAdapter(cassandraOperations)
.statementExpression("T(QueryBuilder).truncate('book').build()")
) { async(false) }
}
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");
Map<String, Expression> params = new HashMap<>();
params.put("author", PARSER.parseExpression("payload"));
params.put("size", PARSER.parseExpression("headers.limit"));
cassandraMessageHandler.setParameterExpressions(params);
cassandraMessageHandler.setOutputChannel(resultChannel());
cassandraMessageHandler.setProducesReply(true);
return cassandraMessageHandler;
}
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
auto-startup="false"
async="false"/>
<int-cassandra:outbound-gateway id="outgateway"
request-channel="input"
cassandra-template="cassandraTemplate"
mode="STATEMENT"
write-options="writeOptions"
query="SELECT * FROM book limit :size"
reply-channel="resultChannel"
auto-startup="true">
<int-cassandra:parameter-expression name="author" expression="payload"/>
<int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>
If a CassandraMessageHandler
is used as a gateway in the default async mode, a Mono<WriteResult>
is produced, which is handled according to the provided MessageChannel
implementation.
For true reactive processing a FluxMessageChannel
is recommended for the output channel configuration.
In sync mode Mono.block()
is called to obtain the reply value.
If INSERT
, UPDATE
or DELETE
operations are performed, an entity (marked org.springframework.data.cassandra.core.mapping.Table
) is expected in the request message payload.
If the payload is a list of entities, then the respective batch operation is performed.
The ingestQuery
mode expects the payload to be present as a matrix of values to insert - List<List<?>>
.
For example, if the entity is like this:
@Table("book")
public record Book(@PrimaryKey String isbn,
String title,
@Indexed String author,
int pages,
LocalDate saleDate,
boolean isInStock) {
}
And channel adapter has this configuration:
@Bean
public MessageHandler cassandraMessageHandler3() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
cassandraMessageHandler.setIngestQuery(cqlIngest);
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}
The request message payload must be converted like this:
List<List<Object>> ingestBooks =
payload.stream()
.map(book ->
List.<Object>of(
book.isbn(),
book.title(),
book.author(),
book.pages(),
book.saleDate(),
book.isInStock()))
.toList();
For more sophisticated use-cases, the payload can be as an instance of com.datastax.oss.driver.api.core.cql.Statement
.
The com.datastax.oss.driver.api.querybuilder.QueryBuilder
API is recommended to build various statements to execute against Apache Cassandra.
For example, to remove all the data from the Book
table, a message with a payload like this can be sent to the CassandraMessageHandler
: QueryBuilder.truncate("book").build()
.
Alternatively, for logic based on a request message, a statementExpression
or statementProcessor
can be provided for the CassandraMessageHandler
to build a Statement
based on that message.
For convenience, a com.datastax.oss.driver.api.querybuilder
is registered as an import
into a SpEL evaluation context, so a target expression can be as simple as this:
statement-expression="T(QueryBuilder).selectFrom("book").all()"
The setParameterExpressions(Map<String, Expression> parameterExpressions)
represents bindable named query parameters and is used only with a setQuery(String query)
option.
See Java and XML samples mentioned above.