Transactional Binder

Enable transactions by setting spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix to a non-empty value, e.g. tx-. When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction. When the listener exits normally, the listener container will send the offset to the transaction and commit it. A common producer factory is used for all producer bindings configured using spring.cloud.stream.kafka.binder.transaction.producer.* properties; individual binding Kafka producer properties are ignored.

Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too. When retries are enabled (the common property maxAttempts is greater than zero) the retry properties are used to configure a DefaultAfterRollbackProcessor to enable retries at the container level. Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the DefaultAfterRollbackProcessor which runs after the main transaction has rolled back.

If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. @Scheduled method), you must get a reference to the transactional producer factory and define a KafkaTransactionManager bean using it.

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

Notice that we get a reference to the binder using the BinderFactory; use null in the first argument when there is only one binder configured. If more than one binder is configured, use the binder name to get the reference. Once we have a reference to the binder, we can obtain a reference to the ProducerFactory and create a transaction manager.

Then you would use normal Spring transaction support, e.g. TransactionTemplate or @Transactional, for example:

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

If you wish to synchronize producer-only transactions with those from some other transaction manager, use a ChainedTransactionManager.

If you deploy multiple instances of your application, each instance needs a unique transactionIdPrefix.

Exception Retry Behavior in Kafka Transactions

Configuring Transaction Rollback Retry Behavior

When processing messages within a Kafka transaction, you can configure which exceptions should be retried after a transaction rollback using the defaultRetryable property and the retryableExceptions map.

Default Retry Behavior

The DefaultAfterRollbackProcessor determines which exceptions trigger a retry after a transaction rollback. By default, all exceptions will be retried, but you can modify this behavior:

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             defaultRetryable: false  # Change default to NOT retry exceptions

When defaultRetryable is set to false, the DefaultAfterRollbackProcessor will be configured with defaultFalse(true), meaning exceptions will not be retried unless explicitly configured as retryable.

Exception-Specific Configuration

For fine-grained control, you can specify retry behavior for individual exception types:

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             retryableExceptions:
               java.lang.IllegalStateException: true    # Always retry this exception
               java.lang.IllegalArgumentException: false  # Never retry this exception

The DefaultAfterRollbackProcessor will use addRetryableExceptions() for exceptions marked as true and addNotRetryableExceptions() for those marked as false. These exception-specific configurations take precedence over the default behavior.

Implementation Details

  • Only exception types (subclasses of Exception) can be configured in retryableExceptions when using transactions

  • An IllegalArgumentException will be thrown if non-Exception types are specified

  • The DefaultAfterRollbackProcessor is only configured when transactions are enabled and batch mode is disabled

  • This configuration ensures that the transaction retry behavior is consistent with non-transactional retry handling