Configuring Topics

If you define a KafkaAdmin bean in your application context, it can automatically add topics to the broker. To do so, you can add a NewTopic @Bean for each topic to the application context. Version 2.3 introduced a new class TopicBuilder to make creation of such beans more convenient. The following example shows how to do so:

  • Java

  • Kotlin

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, List.of(0, 1))
            .assignReplicas(1, List.of(1, 2))
            .assignReplicas(2, List.of(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))

@Bean
fun topic1() =
    TopicBuilder.name("thing1")
        .partitions(10)
        .replicas(3)
        .compact()
        .build()

@Bean
fun topic2() =
    TopicBuilder.name("thing2")
        .partitions(10)
        .replicas(3)
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

@Bean
fun topic3() =
    TopicBuilder.name("thing3")
        .assignReplicas(0, Arrays.asList(0, 1))
        .assignReplicas(1, Arrays.asList(1, 2))
        .assignReplicas(2, Arrays.asList(2, 0))
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

Starting with version 2.6, you can omit partitions() and/or replicas() and the broker defaults will be applied to those properties. The broker version must be at least 2.4.0 to support this feature - see KIP-464.

  • Java

  • Kotlin

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()

@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()

@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()

Starting with version 2.7, you can declare multiple NewTopics in a single KafkaAdmin.NewTopics bean definition:

  • Java

  • Kotlin

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
    TopicBuilder.name("defaultBoth")
        .build(),
    TopicBuilder.name("defaultPart")
        .replicas(1)
        .build(),
    TopicBuilder.name("defaultRepl")
        .partitions(3)
        .build()
)
When using Spring Boot, a KafkaAdmin bean is automatically registered so you only need the NewTopic (and/or NewTopics) @Beans.

By default, if the broker is not available, a message is logged, but the context continues to load. You can programmatically invoke the admin’s initialize() method to try again later. If you wish this condition to be considered fatal, set the admin’s fatalIfBrokerNotAvailable property to true. The context then fails to initialize.

If the broker supports it (1.0.0 or higher), the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the NewTopic.numPartitions.

Starting with version 2.7, the KafkaAdmin provides methods to create and examine topics at runtime.

  • createOrModifyTopics

  • describeTopics

For more advanced features, you can use the AdminClient directly. The following example shows how to do so:

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

Starting with versions 2.9.10, 3.0.9, you can provide a Predicate<NewTopic> which can be used to determine whether a particular NewTopic bean should be considered for creation or modification. This is useful, for example, if you have multiple KafkaAdmin instances pointing to different clusters and you wish to select those topics that should be created or modified by each admin.

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));