Apache Cassandra Support

Spring Integration provides channel adapters (starting with version 6.0) for performing database operations against an Apache Cassandra cluster. It is fully based on the Spring Data for Apache Cassandra project.

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-cassandra</artifactId>
    <version>6.1.0</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-cassandra:6.1.0"

Cassandra Outbound Components

The CassandraMessageHandler is an AbstractReplyProducingMessageHandler implementation and can work in both one-way (default) and request-reply modes (a producesReply option). It is asynchronous by default (setAsync(false) to reset) and performs reactive INSERT, UPDATE, DELETE or STATEMENT operations against the provided ReactiveCassandraOperations. The type of operation can be configured via the CassandraMessageHandler.Type option. The ingestQuery sets the mode into an INSERT; the query or statementExpression, or statementProcessor sets the mode into a STATEMENT.

The following code snippets demonstrate various configuration for this channel adapter or gateway:

Java DSL
@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
    return flow -> flow
            .handle(Cassandra.outboundGateway(cassandraOperations)
                    .query("SELECT * FROM book WHERE author = :author limit :size")
                    .parameter("author", "payload")
                    .parameter("size", m -> m.getHeaders().get("limit")))
            .channel(c -> c.flux("resultChannel"));
}
Kotlin DSL
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
    integrationFlow {
        handle(
            Cassandra.outboundChannelAdapter(cassandraOperations)
                              .statementExpression("T(QueryBuilder).truncate('book').build()")
        ) { async(false) }
    }
Java
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");

    Map<String, Expression> params = new HashMap<>();
    params.put("author", PARSER.parseExpression("payload"));
    params.put("size", PARSER.parseExpression("headers.limit"));

    cassandraMessageHandler.setParameterExpressions(params);

    cassandraMessageHandler.setOutputChannel(resultChannel());
    cassandraMessageHandler.setProducesReply(true);
    return cassandraMessageHandler;
}
XML
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
                                        cassandra-template="cassandraTemplate"
                                        write-options="writeOptions"
                                        auto-startup="false"
                                        async="false"/>

<int-cassandra:outbound-gateway id="outgateway"
                                request-channel="input"
                                cassandra-template="cassandraTemplate"
                                mode="STATEMENT"
                                write-options="writeOptions"
                                query="SELECT * FROM book limit :size"
                                reply-channel="resultChannel"
                                auto-startup="true">
    <int-cassandra:parameter-expression name="author" expression="payload"/>
    <int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>

If a CassandraMessageHandler is used as a gateway in the default async mode, a Mono<WriteResult> is produced, which is handled according to the provided MessageChannel implementation. For true reactive processing a FluxMessageChannel is recommended for the output channel configuration. In sync mode Mono.block() is called to obtain the reply value.

If INSERT, UPDATE or DELETE operations are performed, an entity (marked org.springframework.data.cassandra.core.mapping.Table) is expected in the request message payload. If the payload is a list of entities, then the respective batch operation is performed.

The ingestQuery mode expects the payload to be present as a matrix of values to insert - List<List<?>>. For example, if the entity is like this:

@Table("book")
public record Book(@PrimaryKey String isbn,
                   String title,
                   @Indexed String author,
                   int pages,
                   LocalDate saleDate,
                   boolean isInStock) {

}

And channel adapter has this configuration:

@Bean
public MessageHandler cassandraMessageHandler3() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
    cassandraMessageHandler.setIngestQuery(cqlIngest);
    cassandraMessageHandler.setAsync(false);
    return cassandraMessageHandler;
}

The request message payload must be converted like this:

List<List<Object>> ingestBooks =
    payload.stream()
            .map(book ->
                    List.<Object>of(
                            book.isbn(),
                            book.title(),
                            book.author(),
                            book.pages(),
                            book.saleDate(),
                            book.isInStock()))
            .toList();

For more sophisticated use-cases, the payload can be as an instance of com.datastax.oss.driver.api.core.cql.Statement. The com.datastax.oss.driver.api.querybuilder.QueryBuilder API is recommended to build various statements to execute against Apache Cassandra. For example, to remove all the data from the Book table, a message with a payload like this can be sent to the CassandraMessageHandler: QueryBuilder.truncate("book").build(). Alternatively, for logic based on a request message, a statementExpression or statementProcessor can be provided for the CassandraMessageHandler to build a Statement based on that message. For convenience, a com.datastax.oss.driver.api.querybuilder is registered as an import into a SpEL evaluation context, so a target expression can be as simple as this:

statement-expression="T(QueryBuilder).selectFrom("book").all()"

The setParameterExpressions(Map<String, Expression> parameterExpressions) represents bindable named query parameters and is used only with a setQuery(String query) option. See Java and XML samples mentioned above.