This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.3!

Mixing high level DSL and low level Processor API

Kafka Streams provides two variants of APIs. It has a higher level DSL like API where you can chain various operations that maybe familiar to a lot of functional programmers. Kafka Streams also gives access to a low level Processor API. The processor API, although very powerful and gives the ability to control things in a much lower level, is imperative in nature. Kafka Streams binder for Spring Cloud Stream, allows you to use either the high level DSL or mixing both the DSL and the processor API. Mixing both of these variants give you a lot of options to control various use cases in an application. Applications can use the transform or process method API calls to get access to the processor API.

Here is a look at how one may combine both the DSL and the processor API in a Spring Cloud Stream application using the process API.

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

Here is an example using the transform API.

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

The process API method call is a terminal operation while the transform API is non terminal and gives you a potentially transformed KStream using which you can continue further processing using either the DSL or the processor API.