Transaction Support

This chapter covers Spring Integration’s support for transactions. It covers the following topics:

Understanding Transactions in Message flows

Spring Integration exposes several hooks to address the transactional needs of your message flows. To better understand these hooks and how you can benefit from them, we must first revisit the six mechanisms that you can use to initiate message flows and see how you can address the transactional needs of these flows within each of these mechanisms.

The following six mechanisms initiate a message flow (details for each are provided throughout this manual):

  • Gateway proxy: A basic messaging gateway.

  • Message channel: Direct interactions with MessageChannel methods (for example, channel.send(message)).

  • Message publisher: The way to initiate a message flow as the by-product of method invocations on Spring beans.

  • Inbound channel adapters and gateways: The way to initiate a message flow based on connecting third-party system with the Spring Integration messaging system (for example, [JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel).

  • Scheduler: The way to initiate a message flow based on scheduling events distributed by a pre-configured scheduler.

  • Poller: Similar to the scheduler, this is the way to initiate message flow based on scheduling or interval-based events distributed by a pre-configured poller.

We can split these six mechanisms into two general categories:

  • Message flows initiated by a user process: Example scenarios in this category would be invoking a gateway method or explicitly sending a Message to a MessageChannel. In other words, these message flows depend on a third party process (such as some code that you wrote) to be initiated.

  • Message flows initiated by a daemon process: Example scenarios in this category include a Poller polling a message queue to initiate a new message flow with the polled message or a scheduler scheduling the process by creating a new message and initiating a message flow at a predefined time.

Clearly the gateway proxy, MessageChannel.send(…​) and MessagePublisher all belong to the first category, and inbound adapters and gateways, scheduler, and poller belong to the second category.

So, how can you address transactional needs in various scenarios within each category, and is there a need for Spring Integration to provide something explicit with regard to transactions for a particular scenario? Or can you use Spring’s transaction support instead?

Spring itself provides first class support for transaction management. So our goal here is not to provide something new but rather use Spring to benefit from its existing support for transactions. In other words, as a framework, we must expose hooks to Spring’s transaction management functionality. However, since Spring Integration configuration is based on Spring configuration, we need not always expose these hooks, because Spring already exposes them . After all, every Spring Integration component is a Spring Bean.

With this goal in mind, we can again consider the two scenarios: messgae flows initiated by a user process and message flows initiated by a daemon.

Message flows that are initiated by a user process and configured in a Spring application context are subject to the usual transactional configuration of such processes. Therefore they need not be explicitly configured by Spring Integration to support transactions. The transaction could and should be initiated through Spring’s standard transaction support. The Spring Integration message flow naturally honors the transactional semantics of the components, because it is itself configured by Spring. For example, a gateway or service activator method could be annotated with @Transactional, or a TransactionInterceptor could be defined in an XML configuration with a pointcut expression that pointa to specific methods that should be transactional. The bottom line is that you have full control over transaction configuration and boundaries in these scenarios.

However, things are a bit different when it comes to message flows initiated by a daemon process. Although configured by the developer, these flows do not directly involve a human or some other process to be initiated. These are trigger-based flows that are initiated by a trigger process (a daemon process) based on the configuration of the process. For example, we could have a scheduler initiate a message flow every Friday night. We can also configure a trigger that initiates a message flow every second and so on. As a result, we need a way to let these trigger-based processes know of our intention to make the resulting message flows be transactional, so that a Transaction context can be created whenever a new message flow is initiated. In other words, we need to expose some transaction configuration, but only enough to delegate to the transaction support already provided by Spring (as we do in other scenarios).

Poller Transaction Support

Spring Integration provides transactional support for pollers. Pollers are a special type of component because, within a poller task, we can call receive() against a resource that is itself transactional, thus including the receive() call in the the boundaries of the transaction, which lets it be rolled back in case of a task failure. If we were to add the same support for channels, the added transactions would affect all downstream components starting with the send() call. That provides a rather wide scope for transaction demarcation without any strong reason, especially when Spring already provides several ways to address the transactional needs of any component downstream. However the receive() method being included in a transaction boundary is the “strong reason” for pollers.

Any time you configure a Poller, you can provide transactional configuration by using the transactional child element and its attributes,as the following example shows:

<int:poller max-messages-per-poll="1" fixed-rate="1000">
    <transactional transaction-manager="txManager" 
                   isolation="DEFAULT"
                   propagation="REQUIRED" 
                   read-only="true" 
                   timeout="1000"/>
</poller>

The preceding configuration looks similar to a native Spring transaction configuration. You must still provide a reference to a transaction manager and either specify transaction attributes or rely on defaults (for example, if the 'transaction-manager' attribute is not specified, it defaults to the bean named 'transactionManager'). Internally, the process is wrapped in Spring’s native transaction, where TransactionInterceptor is responsible for handling transactions. For more information on how to configure a transaction manager, the types of transaction managers (such as JTA, Datasource, and others), and other details related to transaction configuration, see the Spring Framework Reference Guide.

With the preceding configuration, all message flows initiated by this poller are transactional. For more information and details on a poller’s transactional configuration, see Polling and Transactions.

Along with transactions, you might need to address several more cross-cutting concerns when you run a poller. To help with that, the poller element accepts an <advice-chain> child element, which lets you define a custom chain of advice instances to be applied on the Poller. (See Pollable Message Source for more details.) In Spring Integration 2.0, the Poller went through a refactoring effort and now uses a proxy mechanism to address transactional concerns as well as other cross-cutting concerns. One of the significant changes evolving from this effort is that we made the <transactional> and <advice-chain> elements be mutually exclusive. The rationale behind this is that, if you need more than one advice and one of them is Transaction advice, you can include it in the <advice-chain> with the same convenience as before but with much more control, since you now have an option to position the advice in the desired order. The following example shows how to do so:

<int:poller max-messages-per-poll="1" fixed-rate="10000">
  <advice-chain>
    <ref bean="txAdvice"/>
    <ref bean="someAotherAdviceBean" />
    <beans:bean class="foo.bar.SampleAdvice"/>
  </advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
  <tx:attributes>
    <tx:method name="get*" read-only="true"/>
    <tx:method name="*"/>
  </tx:attributes>
</tx:advice>

The preceding example shows a basic XML-based configuration of Spring Transaction advice (txAdvice) and included it within the <advice-chain> defined by the Poller. If you need to address only the transactional concerns of the poller, you can still use the <transactional> element as a convenience.

Transaction Boundaries

Another important factor is the boundaries of Transactions within a Message flow. When a transaction is started, the transaction context is bound to the current thread. So regardless of how many endpoints and channels you have in your Message flow your transaction context will be preserved as long as you are ensuring that the flow continues on the same thread. As soon as you break it by introducing a Pollable Channel or Executor Channel or initiate a new thread manually in some service, the Transactional boundary will be broken as well. Essentially the Transaction will END right there, and if a successful handoff has transpired between the threads, the flow would be considered a success and a COMMIT signal would be sent even though the flow will continue and might still result in an Exception somewhere downstream. If such a flow were synchronous, that Exception could be thrown back to the initiator of the Message flow who is also the initiator of the transactional context and the transaction would result in a ROLLBACK. The middle ground is to use transactional channels at any point where a thread boundary is being broken. For example, you can use a Queue-backed Channel that delegates to a transactional MessageStore strategy, or you could use a JMS-backed channel.

Transaction Synchronization

In some environments, it help to synchronize operations with a transaction that encompasses the entire flow. For example, consider a <file:inbound-channel-adapter/> at the start of a flow that performs a number of database updates. If the transaction commits, we might want to move the file to a success directory, while we might want to move it to a failure directory if the transaction rolls back.

Spring Integration 2.2 introduced the capability of synchronizing these operations with a transaction. In addition, you can configure a PseudoTransactionManager if you do not have a 'real' transaction but still want to perform different actions on success or failure. For more information, see Pseudo Transactions.

The following listing shows the key strategy interfaces for this feature:

public interface TransactionSynchronizationFactory {

    TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

    void processBeforeCommit(IntegrationResourceHolder holder);

    void processAfterCommit(IntegrationResourceHolder holder);

    void processAfterRollback(IntegrationResourceHolder holder);

}

The factory is responsible for creating a TransactionSynchronization object. You can implement your own or use the one provided by the framework: DefaultTransactionSynchronizationFactory. This implementation returns a TransactionSynchronization that delegates to a default implementation of TransactionSynchronizationProcessor: ExpressionEvaluatingTransactionSynchronizationProcessor. This processor supports three SpEL expressions: beforeCommitExpression, afterCommitExpression, and afterRollbackExpression.

These actions should be self-explanatory to those familiar with transactions. In each case, the #root variable is the original Message. In some cases, other SpEL variables are made available, depending on the MessageSource being polled by the poller. For example, the MongoDbMessageSource provides the #mongoTemplate variable, which references the message source’s MongoTemplate. Similarly, the RedisStoreMessageSource provides the #store variable, which references the RedisStore created by the poll.

To enable the feature for a particular poller, you can provide a reference to the TransactionSynchronizationFactory on the poller’s <transactional/> element by using the synchronization-factory attribute.

Starting with version 5.0, Spring Integration provides PassThroughTransactionSynchronizationFactory, which is applied by default to polling endpoints when no TransactionSynchronizationFactory is configured but an advice of type TransactionInterceptor exists in the advice chain. When using any out-of-the-box TransactionSynchronizationFactory implementation, polling endpoints bind a polled message to the current transactional context and provide it as a failedMessage in a MessagingException if an exception is thrown after the transaction advice. When using a custom transaction advice that does not implement TransactionInterceptor, you can explicitly configure a PassThroughTransactionSynchronizationFactory to achieve this behavior. In either case, the MessagingException becomes the payload of the ErrorMessage that is sent to the errorChannel, and the cause is the raw exception thrown by the advice. Previously, the ErrorMessage had a payload that was the raw exception thrown by the advice and did not provide a reference to the failedMessage information, making it difficult to determine the reasons for the transaction commit problem.

To simplify configuration of these components, Spring Integration provides namespace support for the default factory. The following example shows how to use the namespace to configure a file inbound channel adapter:

<int-file:inbound-channel-adapter id="inputDirPoller"
    channel="someChannel"
    directory="/foo/bar"
    filter="filter"
    comparator="testComparator">
    <int:poller fixed-rate="5000">
        <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
        channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
        channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

The result of the SpEL evaluation is sent as the payload to either committedChannel or rolledBackChannel (in this case, this would be Boolean.TRUE or Boolean.FALSE — the result of the java.io.File.renameTo() method call).

If you wish to send the entire payload for further Spring Integration processing, use the 'payload' expression.

It is important to understand that this synchronizes the actions with a transaction. It does not make a resource that is not inherently transactional actually be transactional. Instead, the transaction (be it JDBC or otherwise) is started before the poll and either committed or rolled back when the flow completes, followed by the synchronized action.

If you provide a custom TransactionSynchronizationFactory, it is responsible for creating a resource synchronization that causes the bound resource to be unbound automatically when the transaction completes. The default TransactionSynchronizationFactory does so by returning a subclass of ResourceHolderSynchronization, with the default shouldUnbindAtCompletion() returning true.

In addition to the after-commit and after-rollback expressions, before-commit is also supported. In that case, if the evaluation (or downstream processing) throws an exception, the transaction is rolled back instead of being committed.

Pseudo Transactions

After reading the Transaction Synchronization section, you might think it would be useful to take these 'success' or 'failure' actions when a flow completes, even if there is no “real” transactional resources (such as JDBC) downstream of the poller. For example, consider a “<file:inbound-channel-adapter/>” followed by an “<ftp:outbout-channel-adapter/>”. Neither of these components is transactional, but we might want to move the input file to different directories, based on the success or failure of the FTP transfer.

To provide this functionality, the framework provides a PseudoTransactionManager, enabling the above configuration even when there is no real transactional resource involved. If the flow completes normally, the beforeCommit and afterCommit synchronizations are called. On failure, the afterRollback synchronization is called. Because it is not a real transaction, no actual commit or rollback occurs. The pseudo transaction is a vehicle used to enable the synchronization features.

To use a PseudoTransactionManager, you can define it as a <bean/>, in the same way you would configure a real transaction manager. The following example shows how to do so:

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />

Reactive Transactions

Starting with version 5.3, a ReactiveTransactionManager can also be used together with a TransactionInterceptor advice for endpoints returning a reactive type. This includes MessageSource and ReactiveMessageHandler implementations (e.g. ReactiveMongoDbMessageSource) which produce a message with a Flux or Mono payload. All other reply producing message handler implementations can rely on a ReactiveTransactionManager when their reply payload is also some reactive type.