Appendix C. Transaction Support

C.1 Understanding Transactions in Message flows

Spring Integration exposes several hooks to address transactional needs of you message flows. But to better understand these hooks and how you can benefit from them we must first revisit the 6 mechanisms that could be used to initiate Message flows and see how transactional needs of these flows could be addressed within each of these mechanisms.

Here are the 6 mechanisms to initiate a Message flow and their short summary (details for each are provided throughout this manual):

  • Gateway Proxy - Your basic Messaging Gateway
  • MessageChannel - Direct interactions with MessageChannel methods (e.g., channel.send(message))
  • Message Publisher - the way to initiate message flow as the by-product of method invocations on Spring beans
  • Inbound Channel Adapters/Gateways - the way to initiate message flow based on connecting third-party system with Spring Integration messaging system(e.g., [JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel)
  • Scheduler - the way to initiate message flow based on scheduling events distributed by a pre-configured Scheduler
  • Poller - similar to the Scheduler and is the way to initiate message flow based on scheduling or interval-based events distributed by a pre-configured Poller

These 6 could be split in 2 general categories:

  • Message flows initiated by a USER process - Example scenarios in this category would be invoking a Gateway method or explicitly sending a Message to a MessageChannel. In other words, these message flows depend on a third party process (e.g., some code that we wrote) to be initiated.
  • Message flows initiated by a DAEMON process - Example scenarios in this category would be a Poller polling a Message queue to initiate a new Message flow with the polled Message or a Scheduler scheduling the process by creating a new Message and initiating a message flow at a predefined time.

Clearly the Gateway Proxy, MessageChannel.send(..) and MessagePublisher all belong to the 1st category and Inbound Adapters/Gateways, Scheduler and Poller belong to the 2nd.

So, how do we address transactional needs in various scenarios within each category and is there a need for Spring Integration to provide something explicitly with regard to transactions for a particular scenario? Or, can Spring’s Transaction Support be leveraged instead?.

The first and most obvious goal is NOT to re-invent something that has already been invented unless you can provide a better solution. In our case Spring itself provides first class support for transaction management. So our goal here is not to provide something new but rather delegate/use Spring to benefit from the existing support for transactions. In other words as a framework we must expose hooks to the Transaction management functionality provided by Spring. But since Spring Integration configuration is based on Spring Configuration it is not always necessary to expose these hooks as they are already exposed via Spring natively. Remember every Spring Integration component is a Spring Bean after all.

With this goal in mind let’s look at the two scenarios. 

If you think about it, Message flows that are initiated by the USER process (Category 1) and obviously configured in a Spring Application Context, are subject to transactional configuration of such processes and therefore don’t need to be explicitly configured by Spring Integration to support transactions. The transaction could and should be initiated through standard Transaction support provided by Spring. The Spring Integration message flow will honor the transactional semantics of the components naturally because it is Spring configured. For example, a Gateway or ServiceActivator method could be annotated with @Transactional or TransactionInterceptor could be defined in an XML configuration with a point-cut expression pointing to specific methods that should be transactional. The bottom line is that you have full control over transaction configuration and boundaries in these scenarios.

However, things are a bit different when it comes to Message flows initiated by the DAEMON process (Category 2). Although configured by the developer these flows do not directly involve a human or some other process to be initiated. These are trigger-based flows that are initiated by a trigger process (DAEMON process) based on the configuration of such process. For example, we could have a Scheduler initiating a message flow every Friday night of every week. We can also configure a trigger that initiates a Message flow every second, etc. So, we obviously need a way to let these trigger-based processes know of our intention to make the resulting Message flows transactional so that a Transaction context could be created whenever a new Message flow is initiated. In other words we need to expose some Transaction configuration, but ONLY enough to delegate to Transaction support already provided by Spring (as we do in other scenarios).

Spring Integration provides transactional support for Pollers. Pollers are a special type of component because we can call receive() within that poller task against a resource that is itself transactional thus including receive() call in the the boundaries of the Transaction allowing it to be rolled back in case of a task failure. If we were to add the same support for channels, the added transactions would affect all downstream components starting with that send() call. That is providing a rather wide scope for transaction demarcation without any strong reason especially when Spring already provides several ways to address the transactional needs of any component downstream. However the receive() method being included in a transaction boundary is the "strong reason" for pollers.

C.1.1 Poller Transaction Support

Any time you configure a Poller you can provide transactional configuration via the transactional sub-element and its attributes:

<int:poller max-messages-per-poll="1" fixed-rate="1000">
    <transactional transaction-manager="txManager" 
                   isolation="DEFAULT"
                   propagation="REQUIRED" 
                   read-only="true" 
                   timeout="1000"/>
</poller>

As you can see this configuration looks very similar to native Spring transaction configuration. You must still provide a reference to a Transaction manager and specify transaction attributes or rely on defaults (e.g., if the transaction-manager attribute is not specified, it will default to the bean with the name transactionManager). Internally the process would be wrapped in Spring’s native Transaction where TransactionInterceptor is responsible for handling transactions. For more information on how to configure a Transaction Manager, the types of Transaction Managers (e.g., JTA, Datasource etc.) and other details related to transaction configuration please refer to Spring’s Reference manual (Chapter 10 - Transaction Management).

With the above configuration all Message flows initiated by this poller will be transactional. For more information and details on a Poller’s transactional configuration please refer to section - 21.1.1. Polling and Transactions.

Along with transactions, several more cross cutting concerns might need to be addressed when running a Poller. To help with that, the Poller element accepts an <advice-chain> _ sub-element which allows you to define a custom chain of Advice instances to be applied on the Poller. (see section 4.4 for more details) In Spring Integration 2.0, the Poller went through the a refactoring effort and is now using a proxy mechanism to address transactional concerns as well as other cross cutting concerns. One of the significant changes evolving from this effort is that we made _<transactional> and <advice-chain> elements mutually exclusive. The rationale behind this is that if you need more than one advice, and one of them is Transaction advice, then you can simply include it in the <advice-chain> with the same convenience as before but with much more control since you now have an option to position any advice in the desired order.

<int:poller max-messages-per-poll="1" fixed-rate="10000">
  <advice-chain>
    <ref bean="txAdvice"/>
    <ref bean="someAotherAdviceBean" />
    <beans:bean class="foo.bar.SampleAdvice"/>
  </advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
  <tx:attributes>
    <tx:method name="get*" read-only="true"/>
    <tx:method name="*"/>
  </tx:attributes>
</tx:advice>

As you can see from the example above, we have provided a very basic XML-based configuration of Spring Transaction advice  - "txAdvice" and included it within the <advice-chain> defined by the Poller. If you only need to address transactional concerns of the Poller, then you can still use the <transactional> element as a convenience.

C.2 Transaction Boundaries

Another important factor is the boundaries of Transactions within a Message flow. When a transaction is started, the transaction context is bound to the current thread. So regardless of how many endpoints and channels you have in your Message flow your transaction context will be preserved as long as you are ensuring that the flow continues on the same thread. As soon as you break it by introducing a Pollable Channel or Executor Channel or initiate a new thread manually in some service, the Transactional boundary will be broken as well. Essentially the Transaction will END right there, and if a successful handoff has transpired between the threads, the flow would be considered a success and a COMMIT signal would be sent even though the flow will continue and might still result in an Exception somewhere downstream. If such a flow were synchronous, that Exception could be thrown back to the initiator of the Message flow who is also the initiator of the transactional context and the transaction would result in a ROLLBACK. The middle ground is to use transactional channels at any point where a thread boundary is being broken. For example, you can use a Queue-backed Channel that delegates to a transactional MessageStore strategy, or you could use a JMS-backed channel.

C.3 Transaction Synchronization

In some environments, it is advantageous to synchronize operations with a transaction that encompasses the entire flow. For example, consider a <file:inbound-channel-adapter/> at the start of a flow, that performs a number of database updates. If the transaction commits, we might want to move the file to a success directory, while we might want to move it to a failures directory if the transaction rolls back.

Spring Integration 2.2 introduces the capability of synchronizing these operations with a transaction. In addition, you can configure a PseudoTransactionManager if you don’t have a real transaction, but still want to perform different actions on success, or failure. For more information, see Section C.4, “Pseudo Transactions”.

Key strategy interfaces for this feature are

public interface TransactionSynchronizationFactory {

    TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

    void processBeforeCommit(IntegrationResourceHolder holder);

    void processAfterCommit(IntegrationResourceHolder holder);

    void processAfterRollback(IntegrationResourceHolder holder);

}

The factory is responsible for creating a TransactionSynchronization object. You can implement your own, or use the one provided by the framework: DefaultTransactionSynchronizationFactory. This implementation returns a TransactionSynchronization that delegates to a default implementation of TransactionSynchronizationProcessor, the ExpressionEvaluatingTransactionSynchronizationProcessor. This processor supports three SpEL expressions, beforeCommitExpression, afterCommitExpression, and afterRollbackExpression.

These actions should be self-explanatory to those familiar with transactions. In each case, the #root variable is the original Message; in some cases, other SpEL variables are made available, depending on the MessageSource being polled by the poller. For example, the MongoDbMessageSource provides the #mongoTemplate variable which references the message source’s MongoTemplate; the RedisStoreMessageSource provides the #store variable which references the RedisStore created by the poll.

To enable the feature for a particular poller, you provide a reference to the TransactionSynchronizationFactory on the poller’s <transactional/> element using the synchronization-factory attribute.

Starting with version 5.0, a new PassThroughTransactionSynchronizationFactory is provided which is applied by default to polling endpoints when no TransactionSynchronizationFactory is configured but an advice of type TransactionInterceptor exists in the advice chain. When using any out-of-the-box TransactionSynchronizationFactory implementation, polling endpoints bind a polled message to the current transactional context and provide it as a failedMessage in a MessagingException if an exception is thrown after the TX advice. When using a custom TX advice that does not implement TransactionInterceptor, a PassThroughTransactionSynchronizationFactory can be configured explicitly to achieve this behavior. In either case, the MessagingException becomes the payload of the ErrorMessage that is sent to the errorChannel and the cause is the raw exception thrown by the advice. Previously, the ErrorMessage had a payload that was the raw exception thrown by the advice and did not provide a reference to the failedMessage information, making it difficult to determine the reasons for the transaction commit problem.

To simplify configuration of these components, namespace support for the default factory has been provided. Configuration is best described using an example:

<int-file:inbound-channel-adapter id="inputDirPoller"
    channel="someChannel"
    directory="/foo/bar"
    filter="filter"
    comparator="testComparator">
    <int:poller fixed-rate="5000">
        <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo('/success/' + payload.name)" channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo('/failed/' + payload.name)" channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

The result of the SpEL evaluation is sent as the payload to either the committedChannel or rolledBackChannel (in this case, this would be Boolean.TRUE or Boolean.FALSE - the result of the java.io.File.renameTo() method call).

If you wish to send the entire payload for further Spring Integration processing, simply use the expression payload.

[Important]Important

It is important to understand that this is simply synchronizing the actions with a transaction, it does not make a resource that is not inherently transactional actually transactional. Instead, the transaction (be it JDBC or otherwise) is started before the poll, and committed/rolled back when the flow completes, followed by the synchronized action.

It is also important to understand that if you provide a custom TransactionSynchronizationFactory, it is responsible for creating a resource synchronization that will cause the bound resource to be unbound automatically, when the transaction completes. The default TransactionSynchronizationFactory does this by returning a subclass of ResourceHolderSynchronization, with the default shouldUnbindAtCompletion() returning true.

In addition to the after-commit and after-rollback expressions, before-commit is also supported. In that case, if the evaluation (or downstream processing) throws an exception, the transaction will be rolled back instead of being committed.

C.4 Pseudo Transactions

Referring to the above section, you may be thinking it would be useful to take these success or failure actions when a flow completes, even if there is no real transactional resources (such as JDBC) downstream of the poller. For example, consider a <file:inbound-channel-adapter/> followed by an <ftp:outbout-channel-adapter/>. Neither of these components is transactional but we might want to move the input file to different directories, based on the success or failure of the ftp transfer.

To provide this functionality, the framework provides a PseudoTransactionManager, enabling the above configuration even when there is no real transactional resource involved. If the flow completes normally, the beforeCommit and afterCommit synchronizations will be called, on failure the afterRollback will be called. Of course, because it is not a real transaction there will be no actual commit or rollback. The pseudo transaction is simply a vehicle used to enable the synchronization features.

To use a PseudoTransactionManager, simply define it as a <bean/>, in the same way you would configure a real transaction manager:

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />