For the latest stable version, please use spring-cloud-stream 4.2.0! |
Tips, Tricks and Recipes
Simple DLQ with Kafka
Problem Statement
As a developer, I want to write a consumer application that processes records from a Kafka topic. However, if some error occurs in processing, I don’t want the application to stop completely. Instead, I want to send the record in error to a DLT (Dead-Letter-Topic) and then continue processing new records.
Solution
The solution for this problem is to use the DLQ feature in Spring Cloud Stream. For the purposes of this discussion, let us assume that the following is our processor function.
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
This is a very trivial function that throws an exception for all the records that it processes, but you can take this function and extend it to any other similar situations.
In order to send the records in error to a DLT, we need to provide the following configuration.
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
In order to activate DLQ, the application must provide a group name.
Anonymous consumers cannot use the DLQ facilities.
We also need to enable DLQ by setting the enableDLQ
property on the Kafka consumer binding to true
.
Finally, we can optionally provide the DLT name by providing the dlqName
on Kafka consumer binding, which otherwise default to error.input-topic.my-group
in this case.
Note that in the example consumer provided above, the type of the payload is byte[]
.
By default, the DLQ producer in Kafka binder expects the payload of type byte[]
.
If that is not the case, then we need to provide the configuration for proper serializer.
For example, let us re-write the consumer function as below:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
Now, we need to tell Spring Cloud Stream, how we want to serialize the data when writing to the DLT. Here is the modified configuration for this scenario:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
DLQ with Advanced Retry Options
Problem Statement
This is similar to the recipe above, but as a developer I would like to configure the way retries are handled.
Solution
If you followed the above recipe, then you get the default retry options built into the Kafka binder when the processing encounters an error.
By default, the binder retires for a maximum of 3 attempts with a one second initial delay, 2.0 multiplier with each back off with a max delay of 10 seconds. You can change all these configurations as below:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
If you want, you can also provide a list of retryable exceptions by providing a map of boolean values. For example,
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
By default, any exceptions not listed in the map above will be retried. If that is not desired, then you can disable that by providing,
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
You can also provide your own RetryTemplate
and mark it as @StreamRetryTemplate
which will be scanned and used by the binder.
This is useful when you want more sophisticated retry strategies and policies.
If you have multiple @StreamRetryTemplate
beans, then you can specify which one your binding wants by using the property,
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
Handling Deserialization errors with DLQ
Problem Statement
I have a processor that encounters a deserialization exception in Kafka consumer. I would expect that the Spring Cloud Stream DLQ mechanism will catch that scenario, but it does not. How can I handle this?
Solution
The normal DLQ mechanism offered by Spring Cloud Stream will not help when Kafka consumer throws an irrecoverable deserialization exception.
This is because, this exception happens even before the consumer’s poll()
method returns.
Spring for Apache Kafka project offers some great ways to help the binder with this situation.
Let us explore those.
Assuming this is our function:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
It is a trivial function that takes a String
parameter.
We want to bypass the message converters provided by Spring Cloud Stream and want to use native deserializers instead.
In the case of String
types, it does not make much sense, but for more complex types like AVRO etc. you have to rely on external deserializers and therefore want to delegate the conversion to Kafka.
Now when the consumer receives the data, let us assume that there is a bad record that causes a deserialization error, maybe someone passed an Integer
instead of a String
for example.
In that case, if you don’t do something in the application, the exception will be propagated through the chain and your application will exit eventually.
In order to handle this, you can add a ListenerContainerCustomizer
@Bean
that configures a DefaultErrorHandler
.
This DefaultErrorHandler
is configured with a DeadLetterPublishingRecoverer
.
We also need to configure an ErrorHandlingDeserializer
for the consumer.
That sounds like a lot of complex things, but in reality, it boils down to these 3 beans in this case.
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
Let us analyze each of them.
The first one is the ListenerContainerCustomizer
bean that takes a DefaultErrorHandler
.
The container is now customized with that particular error handler.
You can learn more about container customization here.
The second bean is the DefaultErrorHandler
that is configured with a publishing to a DLT
.
See here for more details on DefaultErrorHandler
.
The third bean is the DeadLetterPublishingRecoverer
that is ultimately responsible for sending to the DLT
.
By default, the DLT
topic is named as the ORIGINAL_TOPIC_NAME.DLT.
You can change that though.
See the docs for more details.
We also need to configure an ErrorHandlingDeserializer through application config.
The ErrorHandlingDeserializer
delegates to the actual deserializer.
In case of errors, it sets key/value of the record to be null and includes the raw bytes of the message.
It then sets the exception in a header and passes this record to the listener, which then calls the registered error handler.
Following is the configuration required:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
We are providing the ErrorHandlingDeserializer
through the configuration
property on the binding.
We are also indicating that the actual deserializer to delegate is the StringDeserializer
.
Keep in mind that none of the dlq properties above are relevant for the discussions in this recipe. They are purely meant for addressing any application level errors only.
Basic offset management in Kafka binder
Problem Statement
I want to write a Spring Cloud Stream Kafka consumer application and not sure about how it manages Kafka consumer offsets. Can you explain?
Solution
We encourage you read the docs section on this to get a thorough understanding on it.
Here is it in a gist:
Kafka supports two types of offsets to start with by default - earliest
and latest
.
Their semantics are self-explanatory from their names.
Assuming you are running the consumer for the first time.
If you miss the group.id in your Spring Cloud Stream application, then it becomes an anonymous consumer.
Whenever, you have an anonymous consumer, in that case, Spring Cloud Stream application by default will start from the latest
available offset in the topic partition.
On the other hand, if you explicitly specify a group.id, then by default, the Spring Cloud Stream application will start from the earliest
available offset in the topic partition.
In both cases above (consumers with explicit groups and anonymous groups), the starting offset can be switched around by using the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset
and setting it to either earliest
or latest
.
Now, assume that you already ran the consumer before and now starting it again.
In this case, the starting offset semantics in the above case do not apply as the consumer finds an already committed offset for the consumer group (In the case of an anonymous consumer, although the application does not provide a group.id, the binder will auto generate one for you).
It simply picks up from the last committed offset onward.
This is true, even when you have a startOffset
value provided.
However, you can override the default behavior where the consumer starts from the last committed offset by using the resetOffsets
property.
In order to do that, set the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets
to true
(which is false
by default).
Then make sure you provide the startOffset
value (either earliest
or latest
).
When you do that and then start the consumer application, each time you start, it starts as if this is starting for the first time and ignore any committed offsets for the partition.
Seeking to arbitrary offsets in Kafka
Problem Statement
Using Kafka binder, I know that it can set the offset to either earliest
or latest
, but I have a requirement to seek the offset to something in the middle, an arbitrary offset.
Is there a way to achieve this using Spring Cloud Stream Kafka binder?
Solution
Previously we saw how Kafka binder allows you to tackle basic offset management. By default, the binder does not allow you to rewind to an arbitrary offset, at least through the mechanism we saw in that recipe. However, there are some low-level strategies that the binder provides to achieve this use case. Let’s explore them.
First of all, when you want to reset to an arbitrary offset other than earliest
or latest
, make sure to leave the resetOffsets
configuration to its defaults, which is false
.
Then you have to provide a custom bean of type KafkaBindingRebalanceListener
, which will be injected into all consumer bindings.
It is an interface that comes with a few default methods, but here is the method that we are interested in:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
Let us look at the details.
In essence, this method will be invoked each time during the initial assignment for a topic partition or after a rebalance.
For better illustration, let us assume that our topic is foo
and it has 4 partitions.
Initially, we are only starting a single consumer in the group and this consumer will consume from all partitions.
When the consumer starts for the first time, all 4 partitions are getting initially assigned.
However, we do not want to start the partitions to consume at the defaults (earliest
since we define a group), rather for each partition, we want them to consume after seeking to arbitrary offsets.
Imagine that you have a business case to consume from certain offsets as below.
Partition start offset
0 1000
1 2000
2 2000
3 1000
This could be achieved by implementing the above method as below.
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
This is just a rudimentary implementation.
Real world use cases are much more complex than this and you need to adjust accordingly, but this certainly gives you a basic sketch.
When consumer seek
fails, it may throw some runtime exceptions and you need to decide what to do in those cases.
[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === What if we start a second consumer with the same group id?
When we add a second consumer, a rebalance will occur and some partitions will be moved around.
Let’s say that the new consumer gets partitions 2
and 3
.
When this new Spring Cloud Stream consumer calls this onPartitionsAssigned
method, it will see that this is the initial assignment for partition 2
and 3
on this consumer.
Therefore, it will do the seek operation because of the conditional check on the initial
argument.
In the case of the first consumer, it now only has partitions 0
and 1
However, for this consumer it was simply a rebalance event and not considered as an intial assignment.
Thus, it will not re-seek to the given offsets because of the conditional check on the initial
argument.
[[how-do-i-manually-acknowledge-using-kafka-binder?]] == How do I manually acknowledge using Kafka binder?
Problem Statement
Using Kafka binder, I want to manually acknowledge messages in my consumer. How do I do that?
Solution
By default, Kafka binder delegates to the default commit settings in Spring for Apache Kafka project.
The default ackMode
in Spring Kafka is batch
.
See here for more details on that.
There are situations in which you want to disable this default commit behavior and rely on manual commits. Following steps allow you to do that.
Set the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode
to either MANUAL
or MANUAL_IMMEDIATE
.
When it is set like that, then there will be a header called kafka_acknowledgment
(from KafkaHeaders.ACKNOWLEDGMENT
) present in the message received by the consumer method.
For example, imagine this as your consumer method.
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
Then you set the property spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode
to MANUAL
or MANUAL_IMMEDIATE
.
[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == How do I override the default binding names in Spring Cloud Stream?
Problem Statement
Spring Cloud Stream creates default bindings based on the function definition and signature, but how do I override these to more domain friendly names?
Solution
Assume that following is your function signature.
@Bean
public Function<String, String> uppercase(){
...
}
By default, Spring Cloud Stream will create the bindings as below.
-
uppercase-in-0
-
uppercase-out-0
You can override these bindings to something by using the following properties.
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
After this, all binding properties must be made on the new names, my-transformer-in
and my-transformer-out
.
Here is another example with Kafka Streams and multiple inputs.
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
By default, Spring Cloud Stream will create three different binding names for this function.
-
processOrder-in-0
-
processOrder-in-1
-
processOrder-out-0
You have to use these binding names each time you want to set some configuration on these bindings. You don’t like that, and you want to use more domain-friendly and readable binding names, for example, something like.
-
orders
-
accounts
-
enrichedOrders
You can easily do that by simply setting these three properties
-
spring.cloud.stream.function.bindings.processOrder-in-0=orders
-
spring.cloud.stream.function.bindings.processOrder-in-1=accounts
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
Once you do that, it overrides the default binding names and any properties that you want to set on them must be on these new binding names.
[[how-do-i-send-a-message-key-as-part-of-my-record?]] == How do I send a message key as part of my record?
Problem Statement
I need to send a key along with the payload of the record, is there a way to do that in Spring Cloud Stream?
Solution
It is often necessary that you want to send associative data structure like a map as the record with a key and value. Spring Cloud Stream allows you to do that in a straightforward manner. Following is a basic blueprint for doing this, but you may want to adapt it to your paricular use case.
Here is sample producer method (aka Supplier
).
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
This is a trivial function that sends a message with a String
payload, but also with a key.
Note that we set the key as a message header using KafkaHeaders.MESSAGE_KEY
.
If you want to change the key from the default kafka_messageKey
, then in the configuration, we need to specify this property:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
Please note that we use the binding name supplier-out-0
since that is our function name, please update accordingly.
Then, we use this new key when we produce the message.
[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == How do I use native serializer and deserializer instead of message conversion done by Spring Cloud Stream?
Problem Statement
Instead of using the message converters in Spring Cloud Stream, I want to use native Serializer and Deserializer in Kafka. By default, Spring Cloud Stream takes care of this conversion using its internal built-in message converters. How can I bypass this and delegate the responsibility to Kafka?
Solution
This is really easy to do.
All you have to do is to provide the following property to enable native serialization.
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
Then, you need to also set the serializers. There are a couple of ways to do this.
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
or using the binder configuration.
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
When using the binder way, it is applied against all the bindings whereas setting them at the bindings are per binding.
On the deserializing side, you just need to provide the deserializers as configuration.
For example,
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
You can also set them at the binder level.
There is an optional property that you can set to force native decoding.
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
However, in the case of Kafka binder, this is unnecessary, as by the time it reaches the binder, Kafka already deserializes them using the configured deserializers.
Explain how offset resetting work in Kafka Streams binder
Problem Statement
By default, Kafka Streams binder always starts from the earliest offset for a new consumer. Sometimes, it is beneficial or required by the application to start from the latest offset. Kafka Streams binder allows you to do that.
Solution
Before we look at the solution, let us look at the following scenario.
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
We have a BiConsumer
bean that requires two input bindings.
In this case, the first binding is for a KStream
and the second one is for a KTable
.
When running this application for the first time, by default, both bindings start from the earliest
offset.
What about I want to start from the latest
offset due to some requirements?
You can do this by enabling the following properties.
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
If you want only one binding to start from the latest
offset and the other to consumer from the default earliest
, then leave the latter binding out from the configuration.
Keep in mind that, once there are committed offsets, these setting are not honored and the committed offsets take precedence.
Keeping track of successful sending of records (producing) in Kafka
Problem Statement
I have a Kafka producer application and I want to keep track of all my successful sendings.
Solution
Let us assume that we have this following supplier in the application.
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
Then, we need to define a new MessageChannel
bean to capture all the successful send information.
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
Next, define this property in the application configuration to provide the bean name for the recordMetadataChannel
.
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
At this point, successful sent information will be sent to the fooRecordChannel
.
You can write an IntegrationFlow
as below to see the information.
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
In the handle
method, the payload is what got sent to Kafka and the message headers contain a special key called kafka_recordMetadata
.
Its value is a RecordMetadata
that contains information about topic partition, current offset etc.
Adding custom header mapper in Kafka
Problem Statement
I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that?
Solution
Under normal circumstances, this should be fine.
Imagine, you have the following producer.
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
On the consumer side, you should still see the header "foo", and the following should not give you any issues.
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
If you provide a custom header mapper in the application, then this won’t work.
Let’s say you have an empty KafkaHeaderMapper
in the application.
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
If that is your implementation, then you will miss the foo
header on the consumer.
Chances are that, you may have some logic inside those KafkaHeaderMapper
methods.
You need the following to populate the foo
header.
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
That will properly populate the foo
header from the producer to consumer.
Special note on the id header
In Spring Cloud Stream, the id
header is a special header, but some applications may want to have special custom id headers - something like custom-id
or ID
or Id
.
The first one (custom-id
) will propagate without any custom header mapper from producer to consumer.
However, if you produce with a variant of the framework reserved id
header - such as ID
, Id
, iD
etc. then you will run into issues with the internals of the framework.
See this StackOverflow thread fore more context on this use case.
In that case, you must use a custom KafkaHeaderMapper
to map the case-sensitive id header.
For example, let’s say you have the following producer.
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
The header Id
above will be gone from the consuming side as it clashes with the framework id
header.
You can provide a custom KafkaHeaderMapper
to solve this issue.
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
By doing this, both id
and Id
headers will be available from the producer to the consumer side.
Producing to multiple topics in transaction
Problem Statement
How do I produce transactional messages to multiple Kafka topics?
For more context, see this StackOverflow question.
Solution
Use transactional support in Kafka binder for transactions and then provide an AfterRollbackProcessor
.
In order to produce to multiple topics, use StreamBridge
API.
Below are the code snippets for this:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
Required Configuration
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
in order to test, you can use the following:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
Some important notes:
Please ensure that you don’t have any DLQ settings on the application configuration as we manually configure DLT (By default it will be published to a topic named input.DLT
based on the initial consumer function).
Also, reset the maxAttempts
on consumer binding to 1
in order to avoid retries by the binder.
It will be max tried a total of 3 in the example above (initial try + the 2 attempts in the FixedBackoff
).
See the StackOverflow thread for more details on how to test this code.
If you are using Spring Cloud Stream to test it by adding more consumer functions, make sure to set the isolation-level
on the consumer binding to read-committed
.
This StackOverflow thread is also related to this discussion.
Pitfalls to avoid when running multiple pollable consumers
Problem Statement
How can I run multiple instances of the pollable consumers and generate unique client.id
for each instance?
Solution
Assuming that I have the following definition:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
When running the application, the Kafka consumer generates a client.id (something like consumer-my-group-1
).
For each instance of the application that is running, this client.id
will be the same, causing unexpected issues.
In order to fix this, you can add the following property on each instance of the application:
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
See this GitHub issue for more details.