Tips, Tricks and Examples

Manually Assigning All Partitions

Let’s say you want to always read all records from all partitions (such as when using a compacted topic to load a distributed cache), it can be useful to manually assign the partitions and not use Kafka’s group management. Doing so can be unwieldy when there are many partitions, because you have to list the partitions. It’s also an issue if the number of partitions changes over time, because you would have to recompile your application each time the partition count changes.

The following is an example of how to use the power of a SpEL expression to create the partition list dynamically when the application starts:

@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
            partitions = "#{@finder.partitions('compacted')}"),
            partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    ...
}

@Bean
public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
    return new PartitionFinder(consumerFactory);
}

public static class PartitionFinder {

    private final ConsumerFactory<String, String> consumerFactory;

    public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    public String[] partitions(String topic) {
        try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
            return consumer.partitionsFor(topic).stream()
                .map(pi -> "" + pi.partition())
                .toArray(String[]::new);
        }
    }

}

Using this in conjunction with ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest will load all records each time the application is started. You should also set the container’s AckMode to MANUAL to prevent the container from committing offsets for a null consumer group. Starting with version 3.1, the container will automatically coerce the AckMode to MANUAL when manual topic assignment is used with no consumer group.id. However, starting with version 2.5.5, as shown above, you can apply an initial offset to all partitions; see Explicit Partition Assignment for more information.

Examples of Kafka Transactions with Other Transaction Managers

The following Spring Boot application is an example of chaining database and Kafka transactions. The listener container starts the Kafka transaction and the @Transactional annotation starts the DB transaction. The DB transaction is committed first; if the Kafka transaction fails to commit, the record will be redelivered so the DB update should be idempotent.

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.executeInTransaction(t -> t.send("topic1", "test"));
    }

    @Bean
    public DataSourceTransactionManager dstm(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Component
    public static class Listener {

        private final JdbcTemplate jdbcTemplate;

        private final KafkaTemplate<String, String> kafkaTemplate;

        public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {
            this.jdbcTemplate = jdbcTemplate;
            this.kafkaTemplate = kafkaTemplate;
        }

        @KafkaListener(id = "group1", topics = "topic1")
        @Transactional("dstm")
        public void listen1(String in) {
            this.kafkaTemplate.send("topic2", in.toUpperCase());
            this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
        }

        @KafkaListener(id = "group2", topics = "topic2")
        public void listen2(String in) {
            System.out.println(in);
        }

    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic1").build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic2").build();
    }

}
spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
spring.datasource.username=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committed

spring.kafka.producer.transaction-id-prefix=tx-

#logging.level.org.springframework.transaction=trace
#logging.level.org.springframework.kafka.transaction=debug
#logging.level.org.springframework.jdbc=debug
create table mytable (data varchar(20));

For producer-only transactions, transaction synchronization works:

@Transactional("dstm")
public void someMethod(String in) {
    this.kafkaTemplate.send("topic2", in.toUpperCase());
    this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
}

The KafkaTemplate will synchronize its transaction with the DB transaction and the commit/rollback occurs after the database.

If you wish to commit the Kafka transaction first, and only commit the DB transaction if the Kafka transaction is successful, use nested @Transactional methods:

@Transactional("dstm")
public void someMethod(String in) {
    this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
    sendToKafka(in);
}

@Transactional("kafkaTransactionManager")
public void sendToKafka(String in) {
    this.kafkaTemplate.send("topic2", in.toUpperCase());
}

Customizing the JsonSerializer and JsonDeserializer

The serializer and deserializer support a number of cusomizations using properties, see JSON for more information. The kafka-clients code, not Spring, instantiates these objects, unless you inject them directly into the consumer and producer factories. If you wish to configure the (de)serializer using properties, but wish to use, say, a custom ObjectMapper, simply create a subclass and pass the custom mapper into the super constructor. For example:

public class CustomJsonSerializer extends JsonSerializer<Object> {

    public CustomJsonSerializer() {
        super(customizedObjectMapper());
    }

    private static ObjectMapper customizedObjectMapper() {
        ObjectMapper mapper = JacksonUtils.enhancedObjectMapper();
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return mapper;
    }

}