Transactions

This section describes how Spring for Apache Kafka supports transactions.

Overview

The 0.11.0.0 client library added support for transactions. Spring for Apache Kafka adds support in the following ways:

  • KafkaTransactionManager: Used with normal Spring transaction support (@Transactional, TransactionTemplate, etc)

  • Transactional KafkaMessageListenerContainer

  • Local transactions with KafkaTemplate

  • Transaction synchronization with other transaction managers

Transactions are enabled by providing the DefaultKafkaProducerFactory with a transactionIdPrefix. In that case, instead of managing a single shared Producer, the factory maintains a cache of transactional producers. When the user calls close() on a producer, it is returned to the cache for reuse instead of actually being closed. The transactional.id property of each producer is transactionIdPrefix + n, where n starts with 0 and is incremented for each new producer. In previous versions of Spring for Apache Kafka, the transactional.id was generated differently for transactions started by a listener container with a record-based listener, to support fencing zombies, which is not necessary any more, with EOSMode.V2 being the only option starting with 3.0. For applications running with multiple instances, the transactionIdPrefix must be unique per instance.

With Spring Boot, it is only necessary to set the spring.kafka.producer.transaction-id-prefix property - Spring Boot will automatically configure a KafkaTransactionManager bean and wire it into the listener container.

Starting with version 2.5.8, you can now configure the maxAge property on the producer factory. This is useful when using transactional producers that might lay idle for the broker’s transactional.id.expiration.ms. With current kafka-clients, this can cause a ProducerFencedException without a rebalance. By setting the maxAge to less than transactional.id.expiration.ms, the factory will refresh the producer if it is past its max age.

Using KafkaTransactionManager

The KafkaTransactionManager is an implementation of Spring Framework’s PlatformTransactionManager. It is provided with a reference to the producer factory in its constructor. If you provide a custom producer factory, it must support transactions. See ProducerFactory.transactionCapable().

You can use the KafkaTransactionManager with normal Spring transaction support (@Transactional, TransactionTemplate, and others). If a transaction is active, any KafkaTemplate operations performed within the scope of the transaction use the transaction’s Producer. The manager commits or rolls back the transaction, depending on success or failure. You must configure the KafkaTemplate to use the same ProducerFactory as the transaction manager.

Transaction Synchronization

This section refers to producer-only transactions (transactions not started by a listener container); see Using Consumer-Initiated Transactions for information about chaining transactions when the container starts the transaction.

If you want to send records to kafka and perform some database updates, you can use normal Spring transaction management with, say, a DataSourceTransactionManager.

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

The interceptor for the @Transactional annotation starts the transaction and the KafkaTemplate will synchronize a transaction with that transaction manager; each send will participate in that transaction. When the method exits, the database transaction will commit followed by the Kafka transaction. If you wish the commits to be performed in the reverse order (Kafka first), use nested @Transactional methods, with the outer method configured to use the DataSourceTransactionManager, and the inner method configured to use the KafkaTransactionManager.

See Examples of Kafka Transactions with Other Transaction Managers for examples of an application that synchronizes JDBC and Kafka transactions in Kafka-first or DB-first configurations.

Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller. Previously, this was silently ignored (logged at debug level). Applications should take remedial action, if necessary, to compensate for the committed primary transaction.

Using Consumer-Initiated Transactions

The ChainedKafkaTransactionManager is now deprecated, since version 2.7; see the JavaDocs for its super class ChainedTransactionManager for more information. Instead, use a KafkaTransactionManager in the container to start the Kafka transaction and annotate the listener method with @Transactional to start the other transaction.

See Examples of Kafka Transactions with Other Transaction Managers for an example application that chains JDBC and Kafka transactions.

Non-Blocking Retries cannot combine with Container Transactions. When the listener code throws an exception, container transaction commit succeeds, and the record is sent to the retryable topic.

KafkaTemplate Local Transactions

You can use the KafkaTemplate to execute a series of operations within a local transaction. The following example shows how to do so:

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

The argument in the callback is the template itself (this). If the callback exits normally, the transaction is committed. If an exception is thrown, the transaction is rolled back.

If there is a KafkaTransactionManager (or synchronized) transaction in process, it is not used. Instead, a new "nested" transaction is used.

TransactionIdPrefix

With EOSMode.V2 (aka BETA), the only supported mode, it is no longer necessary to use the same transactional.id, even for consumer-initiated transactions; in fact, it must be unique on each instance the same as for producer-initiated transactions. This property must have a different value on each application instance.

TransactionIdSuffix Fixed

Since 3.2, a new TransactionIdSuffixStrategy interface was introduced to manage transactional.id suffix. The default implementation is DefaultTransactionIdSuffixStrategy when setting maxCache greater than zero can reuse transactional.id within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter. When a transaction producer is requested and transactional.id all in use, throw a NoProducerAvailableException. User can then use a RetryTemplate configured to retry that exception, with a suitably configured back off.

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

When setting maxCache to 5, transactional.id is my.txid.+`{0-4}`.

When using KafkaTransactionManager with the ConcurrentMessageListenerContainer and enabling maxCache, it is necessary to set maxCache to a value greater than or equal to concurrency. If a MessageListenerContainer is unable to acquire a transactional.id suffix, it will throw a NoProducerAvailableException. When using nested transactions in the ConcurrentMessageListenerContainer, it is necessary to adjust the maxCache setting to handle the increased number of nested transactions.

KafkaTemplate Transactional and non-Transactional Publishing

Normally, when a KafkaTemplate is transactional (configured with a transaction-capable producer factory), transactions are required. The transaction can be started by a TransactionTemplate, a @Transactional method, calling executeInTransaction, or by a listener container, when configured with a KafkaTransactionManager. Any attempt to use the template outside the scope of a transaction results in the template throwing an IllegalStateException. Starting with version 2.4.3, you can set the template’s allowNonTransactional property to true. In that case, the template will allow the operation to run without a transaction, by calling the ProducerFactory's createNonTransactionalProducer() method; the producer will be cached, or thread-bound, as normal for reuse. See Using DefaultKafkaProducerFactory.

Transactions with Batch Listeners

When a listener fails while transactions are being used, the AfterRollbackProcessor is invoked to take some action after the rollback occurs. When using the default AfterRollbackProcessor with a record listener, seeks are performed so that the failed record will be redelivered. With a batch listener, however, the whole batch will be redelivered because the framework doesn’t know which record in the batch failed. See After-rollback Processor for more information.

When using a batch listener, version 2.4.2 introduced an alternative mechanism to deal with failures while processing a batch: BatchToRecordAdapter. When a container factory with batchListener set to true is configured with a BatchToRecordAdapter, the listener is invoked with one record at a time. This enables error handling within the batch, while still making it possible to stop processing the entire batch, depending on the exception type. A default BatchToRecordAdapter is provided, that can be configured with a standard ConsumerRecordRecoverer such as the DeadLetterPublishingRecoverer. The following test case configuration snippet illustrates how to use this feature:

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}