Tips, Tricks and Examples
Manually Assigning All Partitions
Let’s say you want to always read all records from all partitions (such as when using a compacted topic to load a distributed cache), it can be useful to manually assign the partitions and not use Kafka’s group management. Doing so can be unwieldy when there are many partitions, because you have to list the partitions. It’s also an issue if the number of partitions changes over time, because you would have to recompile your application each time the partition count changes.
The following is an example of how to use the power of a SpEL expression to create the partition list dynamically when the application starts:
@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
partitions = "#{@finder.partitions('compacted')}"),
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
...
}
@Bean
public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
return new PartitionFinder(consumerFactory);
}
public static class PartitionFinder {
private final ConsumerFactory<String, String> consumerFactory;
public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}
public String[] partitions(String topic) {
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(pi -> "" + pi.partition())
.toArray(String[]::new);
}
}
}
Using this in conjunction with ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest
will load all records each time the application is started.
You should also set the container’s AckMode
to MANUAL
to prevent the container from committing offsets for a null
consumer group.
Starting with version 3.1, the container will automatically coerce the AckMode
to MANUAL
when manual topic assignment is used with no consumer group.id
.
However, starting with version 2.5.5, as shown above, you can apply an initial offset to all partitions; see Explicit Partition Assignment for more information.
Examples of Kafka Transactions with Other Transaction Managers
The following Spring Boot application is an example of chaining database and Kafka transactions.
The listener container starts the Kafka transaction and the @Transactional
annotation starts the DB transaction.
The DB transaction is committed first; if the Kafka transaction fails to commit, the record will be redelivered so the DB update should be idempotent.
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.executeInTransaction(t -> t.send("topic1", "test"));
}
@Bean
public DataSourceTransactionManager dstm(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Component
public static class Listener {
private final JdbcTemplate jdbcTemplate;
private final KafkaTemplate<String, String> kafkaTemplate;
public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(id = "group1", topics = "topic1")
@Transactional("dstm")
public void listen1(String in) {
this.kafkaTemplate.send("topic2", in.toUpperCase());
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
}
@KafkaListener(id = "group2", topics = "topic2")
public void listen2(String in) {
System.out.println(in);
}
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("topic1").build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic2").build();
}
}
spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
spring.datasource.username=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.producer.transaction-id-prefix=tx-
#logging.level.org.springframework.transaction=trace
#logging.level.org.springframework.kafka.transaction=debug
#logging.level.org.springframework.jdbc=debug
create table mytable (data varchar(20));
For producer-only transactions, transaction synchronization works:
@Transactional("dstm")
public void someMethod(String in) {
this.kafkaTemplate.send("topic2", in.toUpperCase());
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
}
The KafkaTemplate
will synchronize its transaction with the DB transaction and the commit/rollback occurs after the database.
If you wish to commit the Kafka transaction first, and only commit the DB transaction if the Kafka transaction is successful, use nested @Transactional
methods:
@Transactional("dstm")
public void someMethod(String in) {
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
sendToKafka(in);
}
@Transactional("kafkaTransactionManager")
public void sendToKafka(String in) {
this.kafkaTemplate.send("topic2", in.toUpperCase());
}
Customizing the JsonSerializer and JsonDeserializer
The serializer and deserializer support a number of cusomizations using properties, see JSON for more information.
The kafka-clients
code, not Spring, instantiates these objects, unless you inject them directly into the consumer and producer factories.
If you wish to configure the (de)serializer using properties, but wish to use, say, a custom ObjectMapper
, simply create a subclass and pass the custom mapper into the super
constructor. For example:
public class CustomJsonSerializer extends JsonSerializer<Object> {
public CustomJsonSerializer() {
super(customizedObjectMapper());
}
private static ObjectMapper customizedObjectMapper() {
ObjectMapper mapper = JacksonUtils.enhancedObjectMapper();
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return mapper;
}
}