Debezium Support

Debezium Engine, Change Data Capture (CDC) inbound channel adapter. The DebeziumMessageProducer allows capturing database change events, converting them into messages and streaming later to the outbound channels.

You need to include the spring integration Debezium dependency to your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-debezium</artifactId>
    <version>6.2.0-M1</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-debezium:6.2.0-M1"

You also need to include a debezium connector dependency for your input Database. For example to use Debezium with PostgreSQL you will need the postgres debezium connector:

Maven
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium-version}</version>
</dependency>
Gradle
compile "io.debezium:debezium-connector-postgres:{debezium-version}"

Replace the debezium-version with the version compatible with the spring-integration-debezium version being used.

Inbound Debezium Channel Adapter

The Debezium adapter expects a pre-configured DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> instance.

The debezium-supplier provides an out of the box DebeziumEngine.Builder Spring Boot auto-configuration with a handy DebeziumProperties configuration abstraction.

The Debezium Java DSL can create a DebeziumMessageProducer instance from a provided DebeziumEngine.Builder, as well as from a plain Debezium configuration (e.g. java.util.Properties). Later can be handy for some common use-cases with opinionated configuration and serialization formats.

Additionally, the DebeziumMessageProducer can be tuned with the following configuration properties:

  • contentType - allows handling for JSON (default), AVRO and PROTOBUF message contents. The contentType must be be aligned with the SerializationFormat configured for the provided DebeziumEngine.Builder.

  • enableBatch - when set to false (default), the debezium adapter would send new Message for every ChangeEvent data change event received from the source database. If set to true then the adapter sends downstream a single Message for each batch of ChangeEvent received from the Debezium engine. Such a payload is not serializable and would require a custom serialization/deserialization implementation.

  • enableEmptyPayload - Enables support for tombstone (aka delete) messages. On a database row delete, Debezium can send a tombstone change event that has the same key as the deleted row and a value of Optional.empty. Defaults to false.

  • headerMapper - custom HeaderMapper implementation that allows for selecting and converting the ChangeEvent headers into Message headers. The default DefaultDebeziumHeaderMapper implementation provides a setter for setHeaderNamesToMap. By default, all headers are mapped.

  • taskExecutor - Set a custom TaskExecutor for the Debezium engine.

The following code snippets demonstrate various configuration for this channel adapter:

Configuring with Java Configuration

The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
    }

    @Bean
    public MessageChannel debeziumInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer debeziumMessageProducer(
            DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
            MessageChannel debeziumInputChannel) {

        DebeziumMessageProducer debeziumMessageProducer =
            new DebeziumMessageProducer(debeziumEngineBuilder);
        debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
        return debeziumMessageProducer;
    }

    @ServiceActivator(inputChannel = "debeziumInputChannel")
    public void handler(Message<?> message) {

        Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)

        String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)

        String payload = new String((byte[]) message.getPayload()); (3)

        System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
    }

}
1 A name of the logical destination for which the event is intended. Usually the destination is composed of the topic.prefix configuration option, the database name and the table name. For example: my-topic.inventory.orders.
2 Contains the schema for the changed table’s key and the changed row’s actual key. Both the key schema and its corresponding key payload contain a field for each column in the changed table’s PRIMARY KEY (or unique constraint) at the time the connector created the event.
3 Like the key, the payload has a schema section and a payload value section. The schema section contains the schema that describes the Envelope structure of the payload value section, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.

The key.converter.schemas.enable=false and/or value.converter.schemas.enable=false permit disabling the in-message schema content for key or payload respectively.

Similarly, we can configure the DebeziumMessageProducer to process the incoming change events in batches:

@Bean
public MessageProducer debeziumMessageProducer(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
        MessageChannel debeziumInputChannel) {

    DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
	debeziumMessageProducer.setEnableBatch(true);
    debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
    return debeziumMessageProducer;
}

@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
    System.out.println(payload);
}

Debezium Java DSL Support

The spring-integration-debezium provides a convenient Java DSL fluent API via the Debezium factory and the DebeziumMessageProducerSpec implementations.

The Inbound Channel Adapter for Debezium Java DSL is:

 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>   debeziumEngineBuilder = ...
 IntegrationFlow.from(
    Debezium.inboundChannelAdapter(debeziumEngineBuilder)
        .headerNames("special*")
        .contentType("application/json")
        .enableBatch(false))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

Or create an DebeziumMessageProducerSpec instance from native debezium configuration properties and default to JSON serialization formats.

 Properties debeziumConfig = ...
 IntegrationFlow
    .from(Debezium.inboundChannelAdapter(debeziumConfig))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

The following Spring Boot application provides an example of configuring the inbound adapter with the Java DSL:

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow debeziumInbound(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {

        return IntegrationFlow
                .from(Debezium
                        .inboundChannelAdapter(debeziumEngineBuilder)
					    .headerNames("special*")
					    .contentType("application/json")
					    .enableBatch(false))
                .handle(m -> System.out.println(new String((byte[]) m.getPayload())))
                .get();
    }

}