Transaction Support

Starting with version 5.0, a new TransactionHandleMessageAdvice has been introduced to make the whole downstream flow transactional, thanks to the HandleMessageAdvice implementation. When a regular TransactionInterceptor is used in the <request-handler-advice-chain> element (for example, through configuring <tx:advice>), a started transaction is applied only for an internal AbstractReplyProducingMessageHandler.handleRequestMessage() and is not propagated to the downstream flow.

To simplify XML configuration, along with the <request-handler-advice-chain>, a <transactional> element has been added to all <outbound-gateway> and <service-activator> and related components. The following example shows <transactional> in use:

<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
        <int-jdbc:transactional/>
</int-jdbc:outbound-gateway>

<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
    <constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>

If you are familiar with the JPA integration components, such a configuration is not new, but now we can start a transaction from any point in our flow — not only from the <poller> or a message-driven channel adapter such as JMS.

Java configuration can be simplified by using the TransactionInterceptorBuilder, and the result bean name can be used in the messaging annotations adviceChain attribute, as the following example shows:

@Bean
public ConcurrentMetadataStore store() {
    return new SimpleMetadataStore(hazelcastInstance()
                       .getMap("idempotentReceiverMetadataStore"));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(
            new MetadataStoreSelector(
                    message -> message.getPayload().toString(),
                    message -> message.getPayload().toString().toUpperCase(), store()));
}

@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(this.transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRES_NEW)
                .build();
}

@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
         outputChannel = "output",
         adviceChain = { "idempotentReceiverInterceptor",
                 "transactionInterceptor" })
public Transformer transformer() {
    return message -> message;
}

Note the true parameter on the TransactionInterceptorBuilder constructor. It causes the creation of a TransactionHandleMessageAdvice, not a regular TransactionInterceptor.

Java DSL supports an Advice through the .transactional() options on the endpoint configuration, as the following example shows:

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
        .handle(Jpa.updatingGateway(this.entityManagerFactory),
                e -> e.transactional(true))
        .channel(c -> c.queue("persistResults"));
}