Programming Model
When using the programming model provided by Kafka Streams binder, both the high-level Streams DSL and a mix of both the higher level and the lower level Processor-API can be used as options.
When mixing both higher and lower level API’s, this is usually achieved by invoking transform
or process
API methods on KStream
.
Functional Style
Starting with Spring Cloud Stream 3.0.0
, Kafka Streams binder allows the applications to be designed and developed using the functional programming style that is available in Java 8.
This means that the applications can be concisely represented as a lambda expression of types java.util.function.Function
or java.util.function.Consumer
.
Let’s take a very basic example.
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
Albeit simple, this is a complete standalone Spring Boot application that is leveraging Kafka Streams for stream processing.
This is a consumer application with no outbound binding and only a single inbound binding.
The application consumes data and it simply logs the information from the KStream
key and value on the standard output.
The application contains the SpringBootApplication
annotation and a method that is marked as Bean
.
The bean method is of type java.util.function.Consumer
which is parameterized with KStream
.
Then in the implementation, we are returning a Consumer object that is essentially a lambda expression.
Inside the lambda expression, the code for processing the data is provided.
In this application, there is a single input binding that is of type KStream
.
The binder creates this binding for the application with a name process-in-0
, i.e. the name of the function bean name followed by a dash character (-
) and the literal in
followed by another dash and then the ordinal position of the parameter.
You use this binding name to set other properties such as destination.
For example, spring.cloud.stream.bindings.process-in-0.destination=my-topic
.
If the destination property is not set on the binding, a topic is created with the same name as the binding (if there are sufficient privileges for the application) or that topic is expected to be already available. |
Once built as a uber-jar (e.g., kstream-consumer-app.jar
), you can run the above example like the following.
If the applications choose to define the functional beans using Spring’s Component
annotation, the binder also supports that model.
The above functional bean could be rewritten as below.
@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
Here is another example, where it is a full processor with both input and output bindings. This is the classic word-count example in which the application receives data from a topic, the number of occurrences for each word is then computed in a tumbling time-window.
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
Here again, this is a complete Spring Boot application. The difference here from the first application is that the bean method is of type java.util.function.Function
.
The first parameterized type for the Function
is for the input KStream
and the second one is for the output.
In the method body, a lambda expression is provided that is of type Function
and as implementation, the actual business logic is given.
Similar to the previously discussed Consumer based application, the input binding here is named as process-in-0
by default. For the output, the binding name is automatically also set to process-out-0
.
Once built as an uber-jar (e.g., wordcount-processor.jar
), you can run the above example like the following.
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
This application will consume messages from the Kafka topic words
and the computed results are published to an output
topic counts
.
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic required in the processor. Setting up Kafka Streams specific configuration required by the Kafka Streams infrastructure is automatically handled by the framework.
The two examples we saw above have a single KStream
input binding. In both cases, the bindings received the records from a single topic.
If you want to multiplex multiple topics into a single KStream
binding, you can provide comma separated Kafka topics as destinations below.
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
In addition, you can also provide topic patterns as destinations if you want to match topics against a regular exression.
spring.cloud.stream.bindings.process-in-0.destination=input.*
Multiple Input Bindings
Many non-trivial Kafka Streams applications often consume data from more than one topic through multiple bindings.
For instance, one topic is consumed as Kstream
and another as KTable
or GlobalKTable
.
There are many reasons why an application might want to receive data as a table type.
Think of a use-case where the underlying topic is populated through a change data capture (CDC) mechanism from a database or perhaps the application only cares about the latest updates for downstream processing.
If the application specifies that the data needs to be bound as KTable
or GlobalKTable
, then Kafka Streams binder will properly bind the destination to a KTable
or GlobalKTable
and make them available for the application to operate upon.
We will look at a few different scenarios how multiple input bindings are handled in the Kafka Streams binder.
BiFunction in Kafka Streams Binder
Here is an example where we have two inputs and an output. In this case, the application can leverage on java.util.function.BiFunction
.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
Here again, the basic theme is the same as in the previous examples, but here we have two inputs.
Java’s BiFunction
support is used to bind the inputs to the desired destinations.
The default binding names generated by the binder for the inputs are process-in-0
and process-in-1
respectively. The default output binding is process-out-0
.
In this example, the first parameter of BiFunction
is bound as a KStream
for the first input and the second parameter is bound as a KTable
for the second input.
BiConsumer in Kafka Streams Binder
If there are two inputs, but no outputs, in that case we can use java.util.function.BiConsumer
as shown below.
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
Beyond two inputs
What if you have more than two inputs? There are situations in which you need more than two inputs. In that case, the binder allows you to chain partial functions. In functional programming jargon, this technique is generally known as currying. With the functional programming support added as part of Java 8, Java now enables you to write curried functions. Spring Cloud Stream Kafka Streams binder can make use of this feature to enable multiple input bindings.
Let’s see an example.
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
Let’s look at the details of the binding model presented above.
In this model, we have 3 partially applied functions on the inbound. Let’s call them as f(x)
, f(y)
and f(z)
.
If we expand these functions in the sense of true mathematical functions, it will look like these: f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
.
The x
variable stands for KStream<Long, Order>
, the y
variable stands for GlobalKTable<Long, Customer>
and the z
variable stands for GlobalKTable<Long, Product>
.
The first function f(x)
has the first input binding of the application (KStream<Long, Order>
) and its output is the function, f(y).
The function f(y)
has the second input binding for the application (GlobalKTable<Long, Customer>
) and its output is yet another function, f(z)
.
The input for the function f(z)
is the third input for the application (GlobalKTable<Long, Product>
) and its output is KStream<Long, EnrichedOrder>
which is the final output binding for the application.
The input from the three partial functions which are KStream
, GlobalKTable
, GlobalKTable
respectively are available for you in the method body for implementing the business logic as part of the lambda expression.
Input bindings are named as enrichOrder-in-0
, enrichOrder-in-1
and enrichOrder-in-2
respectively. Output binding is named as enrichOrder-out-0
.
With curried functions, you can virtually have any number of inputs. However, keep in mind that, anything more than a smaller number of inputs and partially applied functions for them as above in Java might lead to unreadable code. Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings, and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.
Output Bindings
Kafka Streams binder allows types of either KStream
or KTable
as output bindings.
Behind the scenes, the binder uses the to
method on KStream
to send the resultant records to the output topic.
If the application provides a KTable
as output in the function, the binder still uses this technique by delegating to the to
method of KStream
.
For example both functions below will work:
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
Multiple Output Bindings
Kafka Streams allows writing outbound data into multiple topics. This feature is known as branching in Kafka Streams.
When using multiple output bindings, you need to provide an array of KStream (KStream[]
) as the outbound return type.
Here is an example:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();
return stringKStreamMap.values().toArray(new KStream[0]);
};
}
The programming model remains the same, however the outbound parameterized type is KStream[]
.
The default output binding names are process-out-0
, process-out-1
, process-out-2
respectively for the function above.
The reason why the binder generates three output bindings is because it detects the length of the returned KStream
array as three.
Note that in this example, we provide a noDefaultBranch()
; if we have used defaultBranch()
instead, that would have required an extra output binding, essentially returning a KStream
array of length four.
Summary of Function based Programming Styles for Kafka Streams
In summary, the following table shows the various options that can be used in the functional paradigm.
Number of Inputs | Number of Outputs | Component to use |
---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
Use curried functions |
-
In the case of more than one output in this table, the type simply becomes
KStream[]
.
Function composition in Kafka Streams binder
Kafka Streams binder supports minimal forms of functional composition for linear topologies.
Using the Java functional API support, you can write multiple functions and then compose them on your own using the andThen
method.
For example, assume that you have the following two functions.
@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
Even without the functional composition support in the binder, you can compose these two functions as below.
@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
Then you can provide definitions of the form spring.cloud.function.definition=foo;bar;composed
.
With the functional composition support in the binder, you don’t need to write this third function in which you are doing explicit function composition.
You can simply do this instead:
spring.cloud.function.definition=foo|bar
You can even do this:
spring.cloud.function.definition=foo|bar;foo;bar
The composed function’s default binding names in this example becomes foobar-in-0
and foobar-out-0
.
Limitations of functional composition in Kafka Streams bincer
When you have java.util.function.Function
bean, that can be composed with another function or multiple functions.
The same function bean can be composed with a java.util.function.Consumer
as well. In this case, consumer is the last component composed.
A function can be composed with multiple functions, then end with a java.util.function.Consumer
bean as well.
When composing the beans of type java.util.function.BiFunction
, the BiFunction
must be the first function in the definition.
The composed entities must be either of type java.util.function.Function
or java.util.funciton.Consumer
.
In other words, you cannot take a BiFunction
bean and then compose with another BiFunction
.
You cannot compose with types of BiConsumer
or definitions where Consumer
is the first component.
You cannot also compose with functions where the output is an array (KStream[]
for branching) unless this is the last component in the definition.
The very first Function
of BiFunction
in the function definition may use a curried form also.
For example, the following is possible.
@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
and the function definition could be curriedFoo|bar
.
Behind the scenes, the binder will create two input bindings for the curried function, and an output binding based on the final function in the definition.
The default input bindings in this case are going to be curriedFoobar-in-0
and curriedFoobar-in-1
.
The default output binding for this example becomes curriedFoobar-out-0
.
Special note on using KTable
as output in function composition
Lets say you have the following two functions.
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
You can compose them as foo|bar
, but keep in mind that the second function (bar
in this case) must have a KTable
as input since the first function (foo
) has KTable
as output.