Programming Model

When using the programming model provided by Kafka Streams binder, both the high-level Streams DSL and a mix of both the higher level and the lower level Processor-API can be used as options. When mixing both higher and lower level API’s, this is usually achieved by invoking transform or process API methods on KStream.

Functional Style

Starting with Spring Cloud Stream 3.0.0, Kafka Streams binder allows the applications to be designed and developed using the functional programming style that is available in Java 8. This means that the applications can be concisely represented as a lambda expression of types java.util.function.Function or java.util.function.Consumer.

Let’s take a very basic example.

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

Albeit simple, this is a complete standalone Spring Boot application that is leveraging Kafka Streams for stream processing. This is a consumer application with no outbound binding and only a single inbound binding. The application consumes data and it simply logs the information from the KStream key and value on the standard output. The application contains the SpringBootApplication annotation and a method that is marked as Bean. The bean method is of type java.util.function.Consumer which is parameterized with KStream. Then in the implementation, we are returning a Consumer object that is essentially a lambda expression. Inside the lambda expression, the code for processing the data is provided.

In this application, there is a single input binding that is of type KStream. The binder creates this binding for the application with a name process-in-0, i.e. the name of the function bean name followed by a dash character (-) and the literal in followed by another dash and then the ordinal position of the parameter. You use this binding name to set other properties such as destination. For example, spring.cloud.stream.bindings.process-in-0.destination=my-topic.

If the destination property is not set on the binding, a topic is created with the same name as the binding (if there are sufficient privileges for the application) or that topic is expected to be already available.

Once built as a uber-jar (e.g., kstream-consumer-app.jar), you can run the above example like the following.

If the applications choose to define the functional beans using Spring’s Component annotation, the binder also supports that model. The above functional bean could be rewritten as below.

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

Here is another example, where it is a full processor with both input and output bindings. This is the classic word-count example in which the application receives data from a topic, the number of occurrences for each word is then computed in a tumbling time-window.

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

Here again, this is a complete Spring Boot application. The difference here from the first application is that the bean method is of type java.util.function.Function. The first parameterized type for the Function is for the input KStream and the second one is for the output. In the method body, a lambda expression is provided that is of type Function and as implementation, the actual business logic is given. Similar to the previously discussed Consumer based application, the input binding here is named as process-in-0 by default. For the output, the binding name is automatically also set to process-out-0.

Once built as an uber-jar (e.g., wordcount-processor.jar), you can run the above example like the following.

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

This application will consume messages from the Kafka topic words and the computed results are published to an output topic counts.

Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic required in the processor. Setting up Kafka Streams specific configuration required by the Kafka Streams infrastructure is automatically handled by the framework.

The two examples we saw above have a single KStream input binding. In both cases, the bindings received the records from a single topic. If you want to multiplex multiple topics into a single KStream binding, you can provide comma separated Kafka topics as destinations below.

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3

In addition, you can also provide topic patterns as destinations if you want to match topics against a regular exression.

spring.cloud.stream.bindings.process-in-0.destination=input.*

Multiple Input Bindings

Many non-trivial Kafka Streams applications often consume data from more than one topic through multiple bindings. For instance, one topic is consumed as Kstream and another as KTable or GlobalKTable. There are many reasons why an application might want to receive data as a table type. Think of a use-case where the underlying topic is populated through a change data capture (CDC) mechanism from a database or perhaps the application only cares about the latest updates for downstream processing. If the application specifies that the data needs to be bound as KTable or GlobalKTable, then Kafka Streams binder will properly bind the destination to a KTable or GlobalKTable and make them available for the application to operate upon. We will look at a few different scenarios how multiple input bindings are handled in the Kafka Streams binder.

BiFunction in Kafka Streams Binder

Here is an example where we have two inputs and an output. In this case, the application can leverage on java.util.function.BiFunction.

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

Here again, the basic theme is the same as in the previous examples, but here we have two inputs. Java’s BiFunction support is used to bind the inputs to the desired destinations. The default binding names generated by the binder for the inputs are process-in-0 and process-in-1 respectively. The default output binding is process-out-0. In this example, the first parameter of BiFunction is bound as a KStream for the first input and the second parameter is bound as a KTable for the second input.

BiConsumer in Kafka Streams Binder

If there are two inputs, but no outputs, in that case we can use java.util.function.BiConsumer as shown below.

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}

Beyond two inputs

What if you have more than two inputs? There are situations in which you need more than two inputs. In that case, the binder allows you to chain partial functions. In functional programming jargon, this technique is generally known as currying. With the functional programming support added as part of Java 8, Java now enables you to write curried functions. Spring Cloud Stream Kafka Streams binder can make use of this feature to enable multiple input bindings.

Let’s see an example.

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

Let’s look at the details of the binding model presented above. In this model, we have 3 partially applied functions on the inbound. Let’s call them as f(x), f(y) and f(z). If we expand these functions in the sense of true mathematical functions, it will look like these: f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>. The x variable stands for KStream<Long, Order>, the y variable stands for GlobalKTable<Long, Customer> and the z variable stands for GlobalKTable<Long, Product>. The first function f(x) has the first input binding of the application (KStream<Long, Order>) and its output is the function, f(y). The function f(y) has the second input binding for the application (GlobalKTable<Long, Customer>) and its output is yet another function, f(z). The input for the function f(z) is the third input for the application (GlobalKTable<Long, Product>) and its output is KStream<Long, EnrichedOrder> which is the final output binding for the application. The input from the three partial functions which are KStream, GlobalKTable, GlobalKTable respectively are available for you in the method body for implementing the business logic as part of the lambda expression.

Input bindings are named as enrichOrder-in-0, enrichOrder-in-1 and enrichOrder-in-2 respectively. Output binding is named as enrichOrder-out-0.

With curried functions, you can virtually have any number of inputs. However, keep in mind that, anything more than a smaller number of inputs and partially applied functions for them as above in Java might lead to unreadable code. Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings, and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.

Output Bindings

Kafka Streams binder allows types of either KStream or KTable as output bindings. Behind the scenes, the binder uses the to method on KStream to send the resultant records to the output topic. If the application provides a KTable as output in the function, the binder still uses this technique by delegating to the to method of KStream.

For example both functions below will work:

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

Multiple Output Bindings

Kafka Streams allows writing outbound data into multiple topics. This feature is known as branching in Kafka Streams. When using multiple output bindings, you need to provide an array of KStream (KStream[]) as the outbound return type.

Here is an example:

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

The programming model remains the same, however the outbound parameterized type is KStream[]. The default output binding names are process-out-0, process-out-1, process-out-2 respectively for the function above. The reason why the binder generates three output bindings is because it detects the length of the returned KStream array as three. Note that in this example, we provide a noDefaultBranch(); if we have used defaultBranch() instead, that would have required an extra output binding, essentially returning a KStream array of length four.

Summary of Function based Programming Styles for Kafka Streams

In summary, the following table shows the various options that can be used in the functional paradigm.

Number of Inputs Number of Outputs Component to use

1

0

java.util.function.Consumer

2

0

java.util.function.BiConsumer

1

1..n

java.util.function.Function

2

1..n

java.util.function.BiFunction

>= 3

0..n

Use curried functions

  • In the case of more than one output in this table, the type simply becomes KStream[].

Function composition in Kafka Streams binder

Kafka Streams binder supports minimal forms of functional composition for linear topologies. Using the Java functional API support, you can write multiple functions and then compose them on your own using the andThen method. For example, assume that you have the following two functions.

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

Even without the functional composition support in the binder, you can compose these two functions as below.

@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

Then you can provide definitions of the form spring.cloud.function.definition=foo;bar;composed. With the functional composition support in the binder, you don’t need to write this third function in which you are doing explicit function composition.

You can simply do this instead:

spring.cloud.function.definition=foo|bar

You can even do this:

spring.cloud.function.definition=foo|bar;foo;bar

The composed function’s default binding names in this example becomes foobar-in-0 and foobar-out-0.

Limitations of functional composition in Kafka Streams bincer

When you have java.util.function.Function bean, that can be composed with another function or multiple functions. The same function bean can be composed with a java.util.function.Consumer as well. In this case, consumer is the last component composed. A function can be composed with multiple functions, then end with a java.util.function.Consumer bean as well.

When composing the beans of type java.util.function.BiFunction, the BiFunction must be the first function in the definition. The composed entities must be either of type java.util.function.Function or java.util.funciton.Consumer. In other words, you cannot take a BiFunction bean and then compose with another BiFunction.

You cannot compose with types of BiConsumer or definitions where Consumer is the first component. You cannot also compose with functions where the output is an array (KStream[] for branching) unless this is the last component in the definition.

The very first Function of BiFunction in the function definition may use a curried form also. For example, the following is possible.

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

and the function definition could be curriedFoo|bar. Behind the scenes, the binder will create two input bindings for the curried function, and an output binding based on the final function in the definition. The default input bindings in this case are going to be curriedFoobar-in-0 and curriedFoobar-in-1. The default output binding for this example becomes curriedFoobar-out-0.

Special note on using KTable as output in function composition

Lets say you have the following two functions.

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

You can compose them as foo|bar, but keep in mind that the second function (bar in this case) must have a KTable as input since the first function (foo) has KTable as output.