Transactional Binder
Enable transactions by setting spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
to a non-empty value, e.g. tx-
.
When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction.
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
A common producer factory is used for all producer bindings configured using spring.cloud.stream.kafka.binder.transaction.producer.*
properties; individual binding Kafka producer properties are ignored.
Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too.
When retries are enabled (the common property maxAttempts is greater than zero) the retry properties are used to configure a DefaultAfterRollbackProcessor to enable retries at the container level.
Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the DefaultAfterRollbackProcessor which runs after the main transaction has rolled back.
|
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. @Scheduled
method), you must get a reference to the transactional producer factory and define a KafkaTransactionManager
bean using it.
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
Notice that we get a reference to the binder using the BinderFactory
; use null
in the first argument when there is only one binder configured.
If more than one binder is configured, use the binder name to get the reference.
Once we have a reference to the binder, we can obtain a reference to the ProducerFactory
and create a transaction manager.
Then you would use normal Spring transaction support, e.g. TransactionTemplate
or @Transactional
, for example:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a ChainedTransactionManager
.
If you deploy multiple instances of your application, each instance needs a unique transactionIdPrefix .
|
Exception Retry Behavior in Kafka Transactions
Configuring Transaction Rollback Retry Behavior
When processing messages within a Kafka transaction, you can configure which exceptions should be retried after a transaction rollback using the defaultRetryable
property and the retryableExceptions
map.
Default Retry Behavior
The DefaultAfterRollbackProcessor
determines which exceptions trigger a retry after a transaction rollback.
By default, all exceptions will be retried, but you can modify this behavior:
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
defaultRetryable: false # Change default to NOT retry exceptions
When defaultRetryable
is set to false
, the DefaultAfterRollbackProcessor
will be configured with defaultFalse(true)
, meaning exceptions will not be retried unless explicitly configured as retryable.
Exception-Specific Configuration
For fine-grained control, you can specify retry behavior for individual exception types:
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
retryableExceptions:
java.lang.IllegalStateException: true # Always retry this exception
java.lang.IllegalArgumentException: false # Never retry this exception
The DefaultAfterRollbackProcessor
will use addRetryableExceptions()
for exceptions marked as true
and addNotRetryableExceptions()
for those marked as false
.
These exception-specific configurations take precedence over the default behavior.
Implementation Details
-
Only exception types (subclasses of
Exception
) can be configured inretryableExceptions
when using transactions -
An
IllegalArgumentException
will be thrown if non-Exception types are specified -
The
DefaultAfterRollbackProcessor
is only configured when transactions are enabled and batch mode is disabled -
This configuration ensures that the transaction retry behavior is consistent with non-transactional retry handling