For the latest stable version, please use Spring Integration 6.4.1! |
Transaction Support
Starting with version 5.0, a new TransactionHandleMessageAdvice
has been introduced to make the whole downstream flow transactional, thanks to the HandleMessageAdvice
implementation.
When a regular TransactionInterceptor
is used in the <request-handler-advice-chain>
element (for example, through configuring <tx:advice>
), a started transaction is applied only for an internal AbstractReplyProducingMessageHandler.handleRequestMessage()
and is not propagated to the downstream flow.
To simplify XML configuration, along with the <request-handler-advice-chain>
, a <transactional>
element has been added to all <outbound-gateway>
and <service-activator>
and related components.
The following example shows <transactional>
in use:
<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
If you are familiar with the JPA integration components, such a configuration is not new, but now we can start a transaction from any point in our flow — not only from the <poller>
or a message-driven channel adapter such as JMS.
Java configuration can be simplified by using the TransactionInterceptorBuilder
, and the result bean name can be used in the messaging annotations adviceChain
attribute, as the following example shows:
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
Note the true
parameter on the TransactionInterceptorBuilder
constructor.
It causes the creation of a TransactionHandleMessageAdvice
, not a regular TransactionInterceptor
.
Java DSL supports an Advice
through the .transactional()
options on the endpoint configuration, as the following example shows:
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}