Tips, Tricks and Recipes

Simple DLQ with Kafka

Problem Statement

As a developer, I want to write a consumer application that processes records from a Kafka topic. However, if some error occurs in processing, I don’t want the application to stop completely. Instead, I want to send the record in error to a DLT (Dead-Letter-Topic) and then continue processing new records.

Solution

The solution for this problem is to use the DLQ feature in Spring Cloud Stream. For the purposes of this discussion, let us assume that the following is our processor function.

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

This is a very trivial function that throws an exception for all the records that it processes, but you can take this function and extend it to any other similar situations.

In order to send the records in error to a DLT, we need to provide the following configuration.

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

In order to activate DLQ, the application must provide a group name. Anonymous consumers cannot use the DLQ facilities. We also need to enable DLQ by setting the enableDLQ property on the Kafka consumer binding to true. Finally, we can optionally provide the DLT name by providing the dlqName on Kafka consumer binding, which otherwise default to error.input-topic.my-group in this case.

Note that in the example consumer provided above, the type of the payload is byte[]. By default, the DLQ producer in Kafka binder expects the payload of type byte[]. If that is not the case, then we need to provide the configuration for proper serializer. For example, let us re-write the consumer function as below:

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

Now, we need to tell Spring Cloud Stream, how we want to serialize the data when writing to the DLT. Here is the modified configuration for this scenario:

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

DLQ with Advanced Retry Options

Problem Statement

This is similar to the recipe above, but as a developer I would like to configure the way retries are handled.

Solution

If you followed the above recipe, then you get the default retry options built into the Kafka binder when the processing encounters an error.

By default, the binder retires for a maximum of 3 attempts with a one second initial delay, 2.0 multiplier with each back off with a max delay of 10 seconds. You can change all these configurations as below:

spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

If you want, you can also provide a list of retryable exceptions by providing a map of boolean values. For example,

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

By default, any exceptions not listed in the map above will be retried. If that is not desired, then you can disable that by providing,

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

You can also provide your own RetryTemplate and mark it as @StreamRetryTemplate which will be scanned and used by the binder. This is useful when you want more sophisticated retry strategies and policies.

If you have multiple @StreamRetryTemplate beans, then you can specify which one your binding wants by using the property,

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

Handling Deserialization errors with DLQ

Problem Statement

I have a processor that encounters a deserialization exception in Kafka consumer. I would expect that the Spring Cloud Stream DLQ mechanism will catch that scenario, but it does not. How can I handle this?

Solution

The normal DLQ mechanism offered by Spring Cloud Stream will not help when Kafka consumer throws an irrecoverable deserialization exception. This is because, this exception happens even before the consumer’s poll() method returns. Spring for Apache Kafka project offers some great ways to help the binder with this situation. Let us explore those.

Assuming this is our function:

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

It is a trivial function that takes a String parameter.

We want to bypass the message converters provided by Spring Cloud Stream and want to use native deserializers instead. In the case of String types, it does not make much sense, but for more complex types like AVRO etc. you have to rely on external deserializers and therefore want to delegate the conversion to Kafka.

Now when the consumer receives the data, let us assume that there is a bad record that causes a deserialization error, maybe someone passed an Integer instead of a String for example. In that case, if you don’t do something in the application, the exception will be propagated through the chain and your application will exit eventually.

In order to handle this, you can add a ListenerContainerCustomizer @Bean that configures a DefaultErrorHandler. This DefaultErrorHandler is configured with a DeadLetterPublishingRecoverer. We also need to configure an ErrorHandlingDeserializer for the consumer. That sounds like a lot of complex things, but in reality, it boils down to these 3 beans in this case.

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

Let us analyze each of them. The first one is the ListenerContainerCustomizer bean that takes a DefaultErrorHandler. The container is now customized with that particular error handler. You can learn more about container customization here.

The second bean is the DefaultErrorHandler that is configured with a publishing to a DLT. See here for more details on DefaultErrorHandler.

The third bean is the DeadLetterPublishingRecoverer that is ultimately responsible for sending to the DLT. By default, the DLT topic is named as the ORIGINAL_TOPIC_NAME.DLT. You can change that though. See the docs for more details.

We also need to configure an ErrorHandlingDeserializer through application config.

The ErrorHandlingDeserializer delegates to the actual deserializer. In case of errors, it sets key/value of the record to be null and includes the raw bytes of the message. It then sets the exception in a header and passes this record to the listener, which then calls the registered error handler.

Following is the configuration required:

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

We are providing the ErrorHandlingDeserializer through the configuration property on the binding. We are also indicating that the actual deserializer to delegate is the StringDeserializer.

Keep in mind that none of the dlq properties above are relevant for the discussions in this recipe. They are purely meant for addressing any application level errors only.

Basic offset management in Kafka binder

Problem Statement

I want to write a Spring Cloud Stream Kafka consumer application and not sure about how it manages Kafka consumer offsets. Can you explain?

Solution

We encourage you read the docs section on this to get a thorough understanding on it.

Here is it in a gist:

Kafka supports two types of offsets to start with by default - earliest and latest. Their semantics are self-explanatory from their names.

Assuming you are running the consumer for the first time. If you miss the group.id in your Spring Cloud Stream application, then it becomes an anonymous consumer. Whenever, you have an anonymous consumer, in that case, Spring Cloud Stream application by default will start from the latest available offset in the topic partition. On the other hand, if you explicitly specify a group.id, then by default, the Spring Cloud Stream application will start from the earliest available offset in the topic partition.

In both cases above (consumers with explicit groups and anonymous groups), the starting offset can be switched around by using the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset and setting it to either earliest or latest.

Now, assume that you already ran the consumer before and now starting it again. In this case, the starting offset semantics in the above case do not apply as the consumer finds an already committed offset for the consumer group (In the case of an anonymous consumer, although the application does not provide a group.id, the binder will auto generate one for you). It simply picks up from the last committed offset onward. This is true, even when you have a startOffset value provided.

However, you can override the default behavior where the consumer starts from the last committed offset by using the resetOffsets property. In order to do that, set the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets to true (which is false by default). Then make sure you provide the startOffset value (either earliest or latest). When you do that and then start the consumer application, each time you start, it starts as if this is starting for the first time and ignore any committed offsets for the partition.

Seeking to arbitrary offsets in Kafka

Problem Statement

Using Kafka binder, I know that it can set the offset to either earliest or latest, but I have a requirement to seek the offset to something in the middle, an arbitrary offset. Is there a way to achieve this using Spring Cloud Stream Kafka binder?

Solution

Previously we saw how Kafka binder allows you to tackle basic offset management. By default, the binder does not allow you to rewind to an arbitrary offset, at least through the mechanism we saw in that recipe. However, there are some low-level strategies that the binder provides to achieve this use case. Let’s explore them.

First of all, when you want to reset to an arbitrary offset other than earliest or latest, make sure to leave the resetOffsets configuration to its defaults, which is false. Then you have to provide a custom bean of type KafkaBindingRebalanceListener, which will be injected into all consumer bindings. It is an interface that comes with a few default methods, but here is the method that we are interested in:

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

Let us look at the details.

In essence, this method will be invoked each time during the initial assignment for a topic partition or after a rebalance. For better illustration, let us assume that our topic is foo and it has 4 partitions. Initially, we are only starting a single consumer in the group and this consumer will consume from all partitions. When the consumer starts for the first time, all 4 partitions are getting initially assigned. However, we do not want to start the partitions to consume at the defaults (earliest since we define a group), rather for each partition, we want them to consume after seeking to arbitrary offsets. Imagine that you have a business case to consume from certain offsets as below.

Partition   start offset

0           1000
1           2000
2           2000
3           1000

This could be achieved by implementing the above method as below.

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

This is just a rudimentary implementation. Real world use cases are much more complex than this and you need to adjust accordingly, but this certainly gives you a basic sketch. When consumer seek fails, it may throw some runtime exceptions and you need to decide what to do in those cases.

[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === What if we start a second consumer with the same group id?

When we add a second consumer, a rebalance will occur and some partitions will be moved around. Let’s say that the new consumer gets partitions 2 and 3. When this new Spring Cloud Stream consumer calls this onPartitionsAssigned method, it will see that this is the initial assignment for partition 2 and 3 on this consumer. Therefore, it will do the seek operation because of the conditional check on the initial argument. In the case of the first consumer, it now only has partitions 0 and 1 However, for this consumer it was simply a rebalance event and not considered as an intial assignment. Thus, it will not re-seek to the given offsets because of the conditional check on the initial argument.

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == How do I manually acknowledge using Kafka binder?

Problem Statement

Using Kafka binder, I want to manually acknowledge messages in my consumer. How do I do that?

Solution

By default, Kafka binder delegates to the default commit settings in Spring for Apache Kafka project. The default ackMode in Spring Kafka is batch. See here for more details on that.

There are situations in which you want to disable this default commit behavior and rely on manual commits. Following steps allow you to do that.

Set the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode to either MANUAL or MANUAL_IMMEDIATE. When it is set like that, then there will be a header called kafka_acknowledgment (from KafkaHeaders.ACKNOWLEDGMENT) present in the message received by the consumer method.

For example, imagine this as your consumer method.

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

Then you set the property spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode to MANUAL or MANUAL_IMMEDIATE.

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == How do I override the default binding names in Spring Cloud Stream?

Problem Statement

Spring Cloud Stream creates default bindings based on the function definition and signature, but how do I override these to more domain friendly names?

Solution

Assume that following is your function signature.

@Bean
public Function<String, String> uppercase(){
...
}

By default, Spring Cloud Stream will create the bindings as below.

  1. uppercase-in-0

  2. uppercase-out-0

You can override these bindings to something by using the following properties.

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

After this, all binding properties must be made on the new names, my-transformer-in and my-transformer-out.

Here is another example with Kafka Streams and multiple inputs.

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

By default, Spring Cloud Stream will create three different binding names for this function.

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

You have to use these binding names each time you want to set some configuration on these bindings. You don’t like that, and you want to use more domain-friendly and readable binding names, for example, something like.

  1. orders

  2. accounts

  3. enrichedOrders

You can easily do that by simply setting these three properties

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

Once you do that, it overrides the default binding names and any properties that you want to set on them must be on these new binding names.

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == How do I send a message key as part of my record?

Problem Statement

I need to send a key along with the payload of the record, is there a way to do that in Spring Cloud Stream?

Solution

It is often necessary that you want to send associative data structure like a map as the record with a key and value. Spring Cloud Stream allows you to do that in a straightforward manner. Following is a basic blueprint for doing this, but you may want to adapt it to your paricular use case.

Here is sample producer method (aka Supplier).

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

This is a trivial function that sends a message with a String payload, but also with a key. Note that we set the key as a message header using KafkaHeaders.MESSAGE_KEY.

If you want to change the key from the default kafka_messageKey, then in the configuration, we need to specify this property:

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

Please note that we use the binding name supplier-out-0 since that is our function name, please update accordingly.

Then, we use this new key when we produce the message.

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == How do I use native serializer and deserializer instead of message conversion done by Spring Cloud Stream?

Problem Statement

Instead of using the message converters in Spring Cloud Stream, I want to use native Serializer and Deserializer in Kafka. By default, Spring Cloud Stream takes care of this conversion using its internal built-in message converters. How can I bypass this and delegate the responsibility to Kafka?

Solution

This is really easy to do.

All you have to do is to provide the following property to enable native serialization.

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

Then, you need to also set the serializers. There are a couple of ways to do this.

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

or using the binder configuration.

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

When using the binder way, it is applied against all the bindings whereas setting them at the bindings are per binding.

On the deserializing side, you just need to provide the deserializers as configuration.

For example,

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

You can also set them at the binder level.

There is an optional property that you can set to force native decoding.

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

However, in the case of Kafka binder, this is unnecessary, as by the time it reaches the binder, Kafka already deserializes them using the configured deserializers.

Explain how offset resetting work in Kafka Streams binder

Problem Statement

By default, Kafka Streams binder always starts from the earliest offset for a new consumer. Sometimes, it is beneficial or required by the application to start from the latest offset. Kafka Streams binder allows you to do that.

Solution

Before we look at the solution, let us look at the following scenario.

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

We have a BiConsumer bean that requires two input bindings. In this case, the first binding is for a KStream and the second one is for a KTable. When running this application for the first time, by default, both bindings start from the earliest offset. What about I want to start from the latest offset due to some requirements? You can do this by enabling the following properties.

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

If you want only one binding to start from the latest offset and the other to consumer from the default earliest, then leave the latter binding out from the configuration.

Keep in mind that, once there are committed offsets, these setting are not honored and the committed offsets take precedence.

Keeping track of successful sending of records (producing) in Kafka

Problem Statement

I have a Kafka producer application and I want to keep track of all my successful sendings.

Solution

Let us assume that we have this following supplier in the application.

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

Then, we need to define a new MessageChannel bean to capture all the successful send information.

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

Next, define this property in the application configuration to provide the bean name for the recordMetadataChannel.

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

At this point, successful sent information will be sent to the fooRecordChannel.

You can write an IntegrationFlow as below to see the information.

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

In the handle method, the payload is what got sent to Kafka and the message headers contain a special key called kafka_recordMetadata. Its value is a RecordMetadata that contains information about topic partition, current offset etc.

Adding custom header mapper in Kafka

Problem Statement

I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that?

Solution

Under normal circumstances, this should be fine.

Imagine, you have the following producer.

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

On the consumer side, you should still see the header "foo", and the following should not give you any issues.

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

If you provide a custom header mapper in the application, then this won’t work. Let’s say you have an empty KafkaHeaderMapper in the application.

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

If that is your implementation, then you will miss the foo header on the consumer. Chances are that, you may have some logic inside those KafkaHeaderMapper methods. You need the following to populate the foo header.

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

That will properly populate the foo header from the producer to consumer.

Special note on the id header

In Spring Cloud Stream, the id header is a special header, but some applications may want to have special custom id headers - something like custom-id or ID or Id. The first one (custom-id) will propagate without any custom header mapper from producer to consumer. However, if you produce with a variant of the framework reserved id header - such as ID, Id, iD etc. then you will run into issues with the internals of the framework. See this StackOverflow thread fore more context on this use case. In that case, you must use a custom KafkaHeaderMapper to map the case-sensitive id header. For example, let’s say you have the following producer.

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

The header Id above will be gone from the consuming side as it clashes with the framework id header. You can provide a custom KafkaHeaderMapper to solve this issue.

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

By doing this, both id and Id headers will be available from the producer to the consumer side.

Producing to multiple topics in transaction

Problem Statement

How do I produce transactional messages to multiple Kafka topics?

For more context, see this StackOverflow question.

Solution

Use transactional support in Kafka binder for transactions and then provide an AfterRollbackProcessor. In order to produce to multiple topics, use StreamBridge API.

Below are the code snippets for this:

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

Required Configuration

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

in order to test, you can use the following:

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

Some important notes:

Please ensure that you don’t have any DLQ settings on the application configuration as we manually configure DLT (By default it will be published to a topic named input.DLT based on the initial consumer function). Also, reset the maxAttempts on consumer binding to 1 in order to avoid retries by the binder. It will be max tried a total of 3 in the example above (initial try + the 2 attempts in the FixedBackoff).

See the StackOverflow thread for more details on how to test this code. If you are using Spring Cloud Stream to test it by adding more consumer functions, make sure to set the isolation-level on the consumer binding to read-committed.

This StackOverflow thread is also related to this discussion.

Pitfalls to avoid when running multiple pollable consumers

Problem Statement

How can I run multiple instances of the pollable consumers and generate unique client.id for each instance?

Solution

Assuming that I have the following definition:

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

When running the application, the Kafka consumer generates a client.id (something like consumer-my-group-1). For each instance of the application that is running, this client.id will be the same, causing unexpected issues.

In order to fix this, you can add the following property on each instance of the application:

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

See this GitHub issue for more details.