JDBC Support

Spring Integration provides channel adapters for receiving and sending messages by using database queries. Through those adapters, Spring Integration supports not only plain JDBC SQL queries but also stored procedure and stored function calls.

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
    <version>6.1.0-M2</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-jdbc:6.1.0-M2"

By default, the following JDBC components are available:

The Spring Integration JDBC Module also provides a JDBC Message Store.

Inbound Channel Adapter

The main function of an inbound channel adapter is to execute a SQL SELECT query and turn the result set into a message. The message payload is the whole result set (expressed as a List), and the types of the items in the list depend on the row-mapping strategy. The default strategy is a generic mapper that returns a Map for each row in the query result. Optionally, you can change this by adding a reference to a RowMapper instance (see the Spring JDBC documentation for more detailed information about row mapping).

If you want to convert rows in the SELECT query result to individual messages, you can use a downstream splitter.

The inbound adapter also requires a reference to either a JdbcTemplate instance or a DataSource.

As well as the SELECT statement to generate the messages, the adapter also has an UPDATE statement that marks the records as processed so that they do not show up in the next poll. The update can be parameterized by the list of IDs from the original select. By default, this is done through a naming convention (a column in the input result set called id is translated into a list in the parameter map for the update called id). The following example defines an inbound channel adapter with an update query and a DataSource reference.

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />
The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration. The underlying Spring JDBC features limit the available expressions (for example, most special characters other than a period are disallowed), but since the target is usually a list of objects (possibly a list of one) that are addressable by bean paths this is not unduly restrictive.

To change the parameter generation strategy, you can inject a SqlParameterSourceFactory into the adapter to override the default behavior (the adapter has a sql-parameter-source-factory attribute). Spring Integration provides ExpressionEvaluatingSqlParameterSourceFactory, which creates a SpEL-based parameter source, with the results of the query as the #root object. (If update-per-row is true, the root object is the row). If the same parameter name appears multiple times in the update query, it is evaluated only once, and its result is cached.

You can also use a parameter source for the select query. In this case, since there is no “result” object to evaluate against, a single parameter source is used each time (rather than using a parameter source factory). Starting with version 4.0, you can use Spring to create a SpEL based parameter source, as the following example shows:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

The value in each parameter expression can be any valid SpEL expression. The #root object for the expression evaluation is the constructor argument defined on the parameterSource bean. It is static for all evaluations (in the preceding example, an empty String).

Starting with version 5.0, you ca supply ExpressionEvaluatingSqlParameterSourceFactory with sqlParameterTypes to specify the target SQL type for the particular parameter.

The following example provides SQL types for the parameters being used in the query:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>
Use the createParameterSourceNoCache factory method. Otherwise, the parameter source caches the result of the evaluation. Also note that, because caching is disabled, if the same parameter name appears in the select query multiple times, it is re-evaluated for each occurrence.

Polling and Transactions

The inbound adapter accepts a regular Spring Integration poller as a child element. Consequently, the frequency of the polling can be controlled (among other uses). An important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, as the following example shows:

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
If you do not explicitly specify a poller, a default value is used. As is normal with Spring Integration, it can be defined as a top-level bean).

In the preceding example, the database is polled every 1000 milliseconds (or once a second), and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown. However, as long as it is aware of the data source, the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.

max-rows Versus max-messages-per-poll

The JDBC inbound channel adapter defines an attribute called max-rows. When you specify the adapter’s poller, you can also define a property called max-messages-per-poll. While these two attributes look similar, their meaning is quite different.

max-messages-per-poll specifies the number of times the query is executed per polling interval, whereas max-rows specifies the number of rows returned for each execution.

Under normal circumstances, you would likely not want to set the poller’s max-messages-per-poll property when you use the JDBC inbound channel adapter. Its default value is 1, which means that the JDBC inbound channel adapter’s receive() method is executed exactly once for each poll interval.

Setting the max-messages-per-poll attribute to a larger value means that the query is executed that many times back to back. For more information regarding the max-messages-per-poll attribute, see Configuring An Inbound Channel Adapter.

In contrast, the max-rows attribute, if greater than 0, specifies the maximum number of rows to be used from the query result set created by the receive() method. If the attribute is set to 0, all rows are included in the resulting message. The attribute defaults to 0.

It is recommended to use result set limiting via vendor-specific query options, for example MySQL LIMIT or SQL Server TOP or Oracle’s ROWNUM. See the particular vendor documentation for more information.

Outbound Channel Adapter

The outbound channel adapter is the inverse of the inbound: its role is to handle a message and use it to execute a SQL query. By default, the message payload and headers are available as input parameters to the query, as the following example shows:

<int-jdbc:outbound-channel-adapter
    query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])"
    data-source="dataSource"
    channel="input"/>

In the preceding example, messages arriving at the channel labelled input have a payload of a map with a key of something, so the [] operator dereferences that value from the map. The headers are also accessed as a map.

The parameters in the preceding query are bean property expressions on the incoming message (not SpEL expressions). This behavior is part of the SqlParameterSource, which is the default source created by the outbound adapter. You can inject a different SqlParameterSourceFactory to get different behavior.

The outbound adapter requires a reference to either a DataSource or a JdbcTemplate. You can also inject a SqlParameterSourceFactory to control the binding of each incoming message to a query.

If the input channel is a direct channel, the outbound adapter runs its query in the same thread and, therefore, the same transaction (if there is one) as the sender of the message.

Passing Parameters by Using SpEL Expressions

A common requirement for most JDBC channel adapters is to pass parameters as part of SQL queries or stored procedures or functions. As mentioned earlier, these parameters are by default bean property expressions, not SpEL expressions. However, if you need to pass SpEL expression as parameters, you must explicitly inject a SqlParameterSourceFactory.

The following example uses a ExpressionEvaluatingSqlParameterSourceFactory to achieve that requirement:

<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
    query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) values (:id, :payload, :createdDate)"
    sql-parameter-source-factory="spelSource"/>

<bean id="spelSource"
      class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="parameterExpressions">
        <map>
            <entry key="id"          value="headers['id'].toString()"/>
            <entry key="createdDate" value="new java.util.Date()"/>
            <entry key="payload"     value="payload"/>
        </map>
    </property>
</bean>

For further information, see Defining Parameter Sources.

Using the PreparedStatement Callback

Sometimes, the flexibility and loose-coupling of SqlParameterSourceFactory does not do what we need for the target PreparedStatement or we need to do some low-level JDBC work. The Spring JDBC module provides APIs to configure the execution environment (such as ConnectionCallback or PreparedStatementCreator) and manipulate parameter values (such as SqlParameterSource). It can even access APIs for low-level operations, such as StatementCallback.

Starting with Spring Integration 4.2, MessagePreparedStatementSetter allows the specification of parameters on the PreparedStatement manually, in the requestMessage context. This class plays exactly the same role as PreparedStatementSetter in the standard Spring JDBC API. Actually, it is invoked directly from an inline PreparedStatementSetter implementation when the JdbcMessageHandler invokes execute on the JdbcTemplate.

This functional interface option is mutually exclusive with sqlParameterSourceFactory and can be used as a more powerful alternative to populate parameters of the PreparedStatement from the requestMessage. For example, it is useful when we need to store File data to the DataBase BLOB column in a streaming manner. The following example shows how to do so:

@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
    JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
            "INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
    jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
        ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
        try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
            ps.setBlob(2, inputStream);
        }
        catch (Exception e) {
            throw new MessageHandlingException(m, e);
        }
        ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
    });
    return jdbcMessageHandler;
}

From the XML configuration perspective, the prepared-statement-setter attribute is available on the <int-jdbc:outbound-channel-adapter> component. It lets you specify a MessagePreparedStatementSetter bean reference.

Batch Update

Starting with version 5.1, the JdbcMessageHandler performs a JdbcOperations.batchUpdate() if the payload of the request message is an Iterable instance. Each element of the Iterable is wrapped to a Message with the headers from the request message if such an element is not a Message already. In the case of regular SqlParameterSourceFactory-based configuration these messages are used to build an SqlParameterSource[] for an argument used in the mentioned JdbcOperations.batchUpdate() function. When a MessagePreparedStatementSetter configuration is applied, a BatchPreparedStatementSetter variant is used to iterate over those messages for each item and the provided MessagePreparedStatementSetter is called against them. The batch update is not supported when keysGenerated mode is selected.

Outbound Gateway

The outbound gateway is like a combination of the outbound and inbound adapters: Its role is to handle a message and use it to execute a SQL query and then respond with the result by sending it to a reply channel. By default, the message payload and headers are available as input parameters to the query, as the following example shows:

<int-jdbc:outbound-gateway
    update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])"
    request-channel="input" reply-channel="output" data-source="dataSource" />

The result of the preceding example is to insert a record into the mythings table and return a message that indicates the number of rows affected (the payload is a map: {UPDATED=1}) to the output channel .

If the update query is an insert with auto-generated keys, you can populate the reply message with the generated keys by adding keys-generated="true" to the preceding example (this is not the default because it is not supported by some database platforms). The following example shows the changed configuration:

<int-jdbc:outbound-gateway
    update="insert into mythings (status, name) values (0, :payload[thing])"
    request-channel="input" reply-channel="output" data-source="dataSource"
    keys-generated="true"/>

Instead of the update count or the generated keys, you can also provide a select query to execute and generate a reply message from the result (such as the inbound adapter), as the following example shows:

<int-jdbc:outbound-gateway
    update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
    query="select * from foos where id=:headers[$id]"
    request-channel="input" reply-channel="output" data-source="dataSource"/>

Since Spring Integration 2.2, the update SQL query is no longer mandatory. You can now provide only a select query, by using either the query attribute or the query element. This is extremely useful if you need to actively retrieve data by using, for example, a generic gateway or a payload enricher. The reply message is then generated from the result (similar to how the inbound adapter works) and passed to the reply channel. The following example show to use the query attribute:

<int-jdbc:outbound-gateway
    query="select * from foos where id=:headers[id]"
    request-channel="input"
    reply-channel="output"
    data-source="dataSource"/>

By default, the component for the SELECT query returns only one (the first) row from the cursor. You can adjust this behavior with the max-rows option. If you need to return all the rows from the SELECT, consider specifying max-rows="0".

As with the channel adapters, you can also provide SqlParameterSourceFactory instances for request and reply. The default is the same as for the outbound adapter, so the request message is available as the root of an expression. If keys-generated="true", the root of the expression is the generated keys (a map if there is only one or a list of maps if multi-valued).

The outbound gateway requires a reference to either a DataSource or a JdbcTemplate. It can also have a SqlParameterSourceFactory injected to control the binding of the incoming message to the query.

Starting with the version 4.2, the request-prepared-statement-setter attribute is available on the <int-jdbc:outbound-gateway> as an alternative to request-sql-parameter-source-factory. It lets you specify a MessagePreparedStatementSetter bean reference, which implements more sophisticated PreparedStatement preparation before its execution.

Starting with the version 6.0, the JdbcOutboundGateway returns an empty list result as is instead of converting it to null as it was before with the meaning "no reply". This caused an extra configuration in applications where handling of empty lists is a part of downstream logic. See Splitter Discard Channel for possible empty list handling option.

See Outbound Channel Adapter for more information about MessagePreparedStatementSetter.

JDBC Message Store

Spring Integration provides two JDBC specific message store implementations. The JdbcMessageStore is suitable for use with aggregators and the claim check pattern. The JdbcChannelMessageStore implementation provides a more targeted and scalable implementation specifically for message channel.

Note that you can use a JdbcMessageStore to back a message channel, JdbcChannelMessageStore is optimized for that purpose.

Starting with versions 5.0.11, 5.1.2, the indexes for the JdbcChannelMessageStore have been optimized. If you have large message groups in such a store, you may wish to alter the indexes. Furthermore, the index for PriorityChannel is commented out because it is not needed unless you are using such channels backed by JDBC.
When using the OracleChannelMessageStoreQueryProvider, the priority channel index must be added because it is included in a hint in the query.

Initializing the Database

Before starting to use JDBC message store components, you should provision a target database with the appropriate objects.

Spring Integration ships with some sample scripts that can be used to initialize a database. In the spring-integration-jdbc JAR file, you can find scripts in the org.springframework.integration.jdbc package. It provides an example create and an example drop script for a range of common database platforms. A common way to use these scripts is to reference them in a Spring JDBC data source initializer. Note that the scripts are provided as samples and as specifications of the required table and column names. You may find that you need to enhance them for production use (for, example, by adding index declarations).

The Generic JDBC Message Store

The JDBC module provides an implementation of the Spring Integration MessageStore (important in the claim check pattern) and MessageGroupStore (important in stateful patterns such as an aggregator) backed by a database. Both interfaces are implemented by the JdbcMessageStore, and there is support for configuring store instances in XML, as the following example shows:

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

You can specify a JdbcTemplate instead of a DataSource.

The following example shows some other optional attributes:

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

In the preceding example, we have specified a LobHandler for dealing with messages as large objects (which is often necessary for Oracle) and a prefix for the table names in the queries generated by the store. The table name prefix defaults to INT_.

Backing Message Channels

If you intend to back message channels with JDBC, we recommend using the JdbcChannelMessageStore implementation. It works only in conjunction with Message Channels.

Supported Databases

The JdbcChannelMessageStore uses database-specific SQL queries to retrieve messages from the database. Therefore, you must set the ChannelMessageStoreQueryProvider property on the JdbcChannelMessageStore. This channelMessageStoreQueryProvider provides the SQL queries for the particular database you specify. Spring Integration provides support for the following relational databases:

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • Derby

  • H2

  • SqlServer

  • Sybase

  • DB2

If your database is not listed, you can extend the AbstractChannelMessageStoreQueryProvider class and provide your own custom queries.

Version 4.0 added the MESSAGE_SEQUENCE column to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.

Custom Message Insertion

Since version 5.0, by overloading the ChannelMessageStorePreparedStatementSetter class, you can provide a custom implementation for message insertion in the JdbcChannelMessageStore. You can use it to set different columns or change the table structure or serialization strategy. For example, instead of default serialization to byte[], you can store its structure as a JSON string.

The following example uses the default implementation of setValues to store common columns and overrides the behavior to store the message payload as a varchar:

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

Generally, we do not recommend using a relational database for queuing. Instead, if possible, consider using either JMS- or AMQP-backed channels instead. For further reference, see the following resource:

If you are still planning to use your database as a queue, consider using PostgreSQL and its notification mechanism which is described in a subsequent section.

Concurrent Polling

When polling a message channel, you have the option to configure the associated Poller with a TaskExecutor reference.

Keep in mind, though, that if you use a JDBC backed message channel and you plan to poll the channel and consequently the message store transactionally with multiple threads, you should ensure that you use a relational database that supports Multiversion Concurrency Control (MVCC). Otherwise, locking may be an issue and the performance, when using multiple threads, may not materialize as expected. For example, Apache Derby is problematic in that regard.

To achieve better JDBC queue throughput and avoid issues when different threads may poll the same Message from the queue, it is important to set the usingIdCache property of JdbcChannelMessageStore to true when using databases that do not support MVCC. The following example shows how to do so:

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />
Priority Channel

Starting with version 4.0, JdbcChannelMessageStore implements PriorityCapableChannelMessageStore and provides the priorityEnabled option, letting it be used as a message-store reference for priority-queue instances. For this purpose, the INT_CHANNEL_MESSAGE table has a MESSAGE_PRIORITY column to store the value of PRIORITY message headers. In addition, a new MESSAGE_SEQUENCE column lets us achieve a robust first-in-first-out (FIFO) polling mechanism, even when multiple messages are stored with the same priority in the same millisecond. Messages are polled (selected) from the database with order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE.

We do not recommend using the same JdbcChannelMessageStore bean for priority and non-priority queue channels, because the priorityEnabled option applies to the entire store and proper FIFO queue semantics are not retained for the queue channel. However, the same INT_CHANNEL_MESSAGE table (and even region) can be used for both JdbcChannelMessageStore types. To configure that scenario, you can extend one message store bean from the other, as the following example shows:
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

Partitioning a Message Store

It is common to use a JdbcMessageStore as a global store for a group of applications or nodes in the same application. To provide some protection against name clashes and to give control over the database meta-data configuration, the message store lets the tables be partitioned in two ways. One way is to use separate table names, by changing the prefix (as described earlier). The other way is to specify a region name for partitioning data within a single table. An important use case for the second approach is when the MessageStore is managing persistent queues that back a Spring Integration Message Channel. The message data for a persistent channel is keyed in the store on the channel name. Consequently, if the channel names are not globally unique, the channels can pick up data that is not intended for them. To avoid this danger, you can use the message store region to keep data separate for different physical channels that have the same logical name.

PostgreSQL: Receiving Push Notifications

PostgreSQL offers a listen and notification framework for receiving push notifications upon database table manipulations. Spring Integration leverages this mechanism (starting with version 6.0) to allow for receiving push notifications when new messages are added to a JdbcChannelMessageStore. When using this feature, a database trigger must be defined, which can be found as part of the comments of the schema-postgresql.sql file which is included in the JDBC module of Spring Integration.

Push notifications are received via the PostgresChannelMessageTableSubscriber class which allows its subscribers to receive a callback upon the arrival of new messages for any given region and groupId. These notifications are received even if a message was appended on a different JVM, but to the same database. The PostgresSubscribableChannel implementation uses a PostgresChannelMessageTableSubscriber.Subscription contract to pull messages from the store as a reaction for notification from the mentioned PostgresChannelMessageTableSubscriber notifications.

For example, push notifications for some group can be received as follows:

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

Any active PostgresChannelMessageTableSubscriber occupies an exclusive JDBC Connection for the duration of its active life cycle. It is therefore important that this connection does not originate from a pooling DataSource. Such connection pools do normally expect that issued connections are closed within a predefined timeout window.

For this need of an exclusive connection, it is also recommended that a JVM only runs a single PostgresChannelMessageTableSubscriber which can be used to register any number of subscriptions.

Stored Procedures

In certain situations, plain JDBC support is not sufficient. Maybe you deal with legacy relational database schemas or you have complex data processing needs, but, ultimately, you have to use stored procedures or stored functions. Since Spring Integration 2.1, we provide three components to execute stored procedures or stored functions:

  • Stored Procedures Inbound Channel Adapter

  • Stored Procedures Outbound Channel Adapter

  • Stored Procedures Outbound Gateway

Supported Databases

In order to enable calls to stored procedures and stored functions, the stored procedure components use the org.springframework.jdbc.core.simple.SimpleJdbcCall class. Consequently, the following databases are fully supported for executing stored procedures:

  • Apache Derby

  • DB2

  • MySQL

  • Microsoft SQL Server

  • Oracle

  • PostgreSQL

  • Sybase

If you want to execute stored functions instead, the following databases are fully supported:

  • MySQL

  • Microsoft SQL Server

  • Oracle

  • PostgreSQL

Even though your particular database may not be fully supported, chances are that you can use the stored procedure Spring Integration components quite successfully anyway, provided your RDBMS supports stored procedures or stored functions.

As a matter of fact, some provided integration tests use the H2 database. Nevertheless, it is very important to thoroughly test those usage scenarios.

Configuration

The stored procedure components provide full XML Namespace support, and configuring the components is similar as for the general purpose JDBC components discussed earlier.

Common Configuration Attributes

All stored procedure components share certain configuration parameters:

  • auto-startup: Lifecycle attribute signaling whether this component should be started during application context startup. It defaults to true. Optional.

  • data-source: Reference to a javax.sql.DataSource, which is used to access the database. Required.

  • id: Identifies the underlying Spring bean definition, which is an instance of either EventDrivenConsumer or PollingConsumer, depending on whether the outbound channel adapter’s channel attribute references a SubscribableChannel or a PollableChannel. Optional.

  • ignore-column-meta-data: For fully supported databases, the underlying SimpleJdbcCall class can automatically retrieve the parameter information for the stored procedure or stored function from the JDBC metadata.

    However, if the database does not support metadata lookups or if you need to provide customized parameter definitions, this flag can be set to true. It defaults to false. Optional.

  • is-function: If true, a SQL Function is called. In that case, the stored-procedure-name or stored-procedure-name-expression attributes define the name of the called function. It defaults to false. Optional.

  • stored-procedure-name: This attribute specifies the name of the stored procedure. If the is-function attribute is set to true, this attribute specifies the function name instead. Either this property or stored-procedure-name-expression must be specified.

  • stored-procedure-name-expression: This attribute specifies the name of the stored procedure by using a SpEL expression. By using SpEL, you have access to the full message (if available), including its headers and payload. You can use this attribute to invoke different stored procedures at runtime. For example, you can provide stored procedure names that you would like to execute as a message header. The expression must resolve to a String.

    If the is-function attribute is set to true, this attribute specifies a stored function. Either this property or stored-procedure-name must be specified.

  • jdbc-call-operations-cache-size: Defines the maximum number of cached SimpleJdbcCallOperations instances. Basically, for each stored procedure name, a new SimpleJdbcCallOperations instance is created that, in return, is cached.

    Spring Integration 2.2 added the stored-procedure-name-expression attribute and the jdbc-call-operations-cache-size attribute.

    The default cache size is 10. A value of 0 disables caching. Negative values are not permitted.

    If you enable JMX, statistical information about the jdbc-call-operations-cache is exposed as an MBean. See MBean Exporter for more information.

  • sql-parameter-source-factory: (Not available for the stored procedure inbound channel adapter.) Reference to a SqlParameterSourceFactory. By default, bean properties of the passed in Message payload are used as a source for the stored procedure’s input parameters by using a BeanPropertySqlParameterSourceFactory.

    This may suffice for basic use cases. For more sophisticated options, consider passing in one or more ProcedureParameter values. See Defining Parameter Sources. Optional.

  • use-payload-as-parameter-source: (Not available for the stored procedure inbound channel adapter.) If set to true, the payload of the Message is used as a source for providing parameters. If set to false, however, the entire Message is available as a source for parameters.

    If no procedure parameters are passed in, this property defaults to true. This means that, by using a default BeanPropertySqlParameterSourceFactory, the bean properties of the payload are used as a source for parameter values for the stored procedure or stored function.

    However, if procedure parameters are passed in, this property (by default) evaluates to false. ProcedureParameter lets SpEL Expressions be provided. Therefore, it is highly beneficial to have access to the entire Message. The property is set on the underlying StoredProcExecutor. Optional.

Common Configuration Sub-Elements

The stored procedure components share a common set of child elements that you can use to define and pass parameters to stored procedures or stored functions. The following elements are available:

  • parameter

  • returning-resultset

  • sql-parameter-definition

  • poller

  • parameter: Provides a mechanism to provide stored procedure parameters. Parameters can be either static or provided by using a SpEL Expressions.

    <int-jdbc:parameter name=""         (1)
                        type=""         (2)
                        value=""/>      (3)
    
    <int-jdbc:parameter name=""
                        expression=""/> (4)

    + <1> The name of the parameter to be passed into the Stored Procedure or Stored Function. Required. <2> This attribute specifies the type of the value. If nothing is provided, this attribute defaults to java.lang.String. This attribute is used only when the value attribute is used. Optional. <3> The value of the parameter. You must provide either this attribute or the expression attribute. Optional. <4> Instead of the value attribute, you can specify a SpEL expression for passing the value of the parameter. If you specify the expression, the value attribute is not allowed. Optional.

    Optional.

  • returning-resultset: Stored procedures may return multiple result sets. By setting one or more returning-resultset elements, you can specify RowMappers to convert each returned ResultSet to meaningful objects. Optional.

    <int-jdbc:returning-resultset name="" row-mapper="" />
  • sql-parameter-definition: If you use a database that is fully supported, you typically do not have to specify the stored procedure parameter definitions. Instead, those parameters can be automatically derived from the JDBC metadata. However, if you use databases that are not fully supported, you must set those parameters explicitly by using the sql-parameter-definition element.

    You can also choose to turn off any processing of parameter metadata information obtained through JDBC by using the ignore-column-meta-data attribute.

    <int-jdbc:sql-parameter-definition
                                       name=""                           (1)
                                       direction="IN"                    (2)
                                       type="STRING"                     (3)
                                       scale="5"                         (4)
                                       type-name="FOO_STRUCT"            (5)
                                       return-type="fooSqlReturnType"/>  (6)
    1 Specifies the name of the SQL parameter. Required.
    2 Specifies the direction of the SQL parameter definition. Defaults to IN. Valid values are: IN, OUT, and INOUT. If your procedure is returning result sets, use the returning-resultset element. Optional.
    3 The SQL type used for this SQL parameter definition. Translates into an integer value, as defined by java.sql.Types. Alternatively, you can provide the integer value as well. If this attribute is not explicitly set, it defaults to 'VARCHAR'. Optional.
    4 The scale of the SQL parameter. Only used for numeric and decimal parameters. Optional.
    5 The typeName for types that are user-named, such as: STRUCT, DISTINCT, JAVA_OBJECT, and named array types. This attribute is mutually exclusive with the scale attribute. Optional.
    6 The reference to a custom value handler for complex types. An implementation of SqlReturnType. This attribute is mutually exclusive with the scale attribute and is only applicable for OUT and INOUT parameters. Optional.
  • poller: Lets you configure a message poller if this endpoint is a PollingConsumer. Optional.

Defining Parameter Sources

Parameter sources govern the techniques of retrieving and mapping the Spring Integration message properties to the relevant stored procedure input parameters.

The stored procedure components follow certain rules. By default, the bean properties of the Message payload are used as a source for the stored procedure’s input parameters. In that case, a BeanPropertySqlParameterSourceFactory is used. This may suffice for basic use cases. The next example illustrates that default behavior.

For the “automatic” lookup of bean properties by using the BeanPropertySqlParameterSourceFactory to work, your bean properties must be defined in lower case. This is due to the fact that in org.springframework.jdbc.core.metadata.CallMetaDataContext (the Java method is matchInParameterValuesWithCallParameters()), the retrieved stored procedure parameter declarations are converted to lower case. As a result, if you have camel-case bean properties (such as lastName), the lookup fails. In that case, provide an explicit ProcedureParameter.

Suppose we have a payload that consists of a simple bean with the following three properties: id, name, and description. Furthermore, we have a simplistic Stored Procedure called INSERT_COFFEE that accepts three input parameters: id, name, and description. We also use a fully supported database. In that case, the following configuration for a stored procedure outbound adapter suffices:

<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
    channel="insertCoffeeProcedureRequestChannel"
    stored-procedure-name="INSERT_COFFEE"/>

For more sophisticated options, consider passing in one or more ProcedureParameter values.

If you do provide ProcedureParameter values explicitly, by default, an ExpressionEvaluatingSqlParameterSourceFactory is used for parameter processing, to enable the full power of SpEL expressions.

If you need even more control over how parameters are retrieved, consider passing in a custom implementation of SqlParameterSourceFactory by using the sql-parameter-source-factory attribute.

Stored Procedure Inbound Channel Adapter

The following listing calls out the attributes that matter for a stored procedure inbound channel adapter:

<int-jdbc:stored-proc-inbound-channel-adapter
                                   channel=""                                    (1)
                                   stored-procedure-name=""
                                   data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   skip-undeclared-results=""                    (2)
                                   return-value-required="false"                 (3)
    <int:poller/>
    <int-jdbc:sql-parameter-definition name="" direction="IN"
                                               type="STRING"
                                               scale=""/>
    <int-jdbc:parameter name="" type="" value=""/>
    <int-jdbc:parameter name="" expression=""/>
    <int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>
1 Channel to which polled messages are sent. If the stored procedure or function does not return any data, the payload of the Message is null. Required.
2 If this attribute is set to true, all results from a stored procedure call that do not have a corresponding SqlOutParameter declaration are bypassed. For example, stored procedures can return an update count value, even though your stored procedure declared only a single result parameter. The exact behavior depends on the database implementation. The value is set on the underlying JdbcTemplate. The value defaults to true. Optional.
3 Indicates whether this procedure’s return value should be included. Since Spring Integration 3.0. Optional.

Stored Procedure Outbound Channel Adapter

The following listing calls out the attributes that matter for a stored procedure outbound channel adapter:

<int-jdbc:stored-proc-outbound-channel-adapter channel=""                        (1)
                                               stored-procedure-name=""
                                               data-source=""
                                               auto-startup="true"
                                               id=""
                                               ignore-column-meta-data="false"
                                               order=""                          (2)
                                               sql-parameter-source-factory=""
                                               use-payload-as-parameter-source="">
    <int:poller fixed-rate=""/>
    <int-jdbc:sql-parameter-definition name=""/>
    <int-jdbc:parameter name=""/>

</int-jdbc:stored-proc-outbound-channel-adapter>
1 The receiving message channel of this endpoint. Required.
2 Specifies the order for invocation when this endpoint is connected as a subscriber to a channel. This is particularly relevant when that channel is using a failover dispatching strategy. It has no effect when this endpoint is itself a polling consumer for a channel with a queue. Optional.

Stored Procedure Outbound Gateway

The following listing calls out the attributes that matter for a stored procedure outbound channel adapter:

<int-jdbc:stored-proc-outbound-gateway request-channel=""                        (1)
                                       stored-procedure-name=""
                                       data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   order=""
                                   reply-channel=""                              (2)
                                   reply-timeout=""                              (3)
                                   return-value-required="false"                 (4)
                                   skip-undeclared-results=""                    (5)
                                   sql-parameter-source-factory=""
                                   use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
                                   type=""
                                   scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
1 The receiving message channel of this endpoint. Required.
2 Message channel to which replies should be sent after receiving the database response. Optional.
3 Lets you specify how long this gateway waits for the reply message to be sent successfully before throwing an exception. Keep in mind that, when sending to a DirectChannel, the invocation occurs in the sender’s thread. Consequently, the failing of the send operation may be caused by other components further downstream. The value is specified in milliseconds. Optional.
4 Indicates whether this procedure’s return value should be included. Optional.
5 If the skip-undeclared-results attribute is set to true, all results from a stored procedure call that do not have a corresponding SqlOutParameter declaration are bypassed. For example, stored procedures may return an update count value, even though your stored procedure only declared a single result parameter. The exact behavior depends on the database. The value is set on the underlying JdbcTemplate. The value defaults to true. Optional.

Examples

This section contains two examples that call Apache Derby stored procedures. The first procedure calls a stored procedure that returns a ResultSet. By using a RowMapper, the data is converted into a domain object, which then becomes the Spring Integration message payload.

In the second sample, we call a stored procedure that uses output parameters to return data instead.

The project contains the Apache Derby example referenced here, as well as instructions on how to run it. The Spring Integration Samples project also provides an example of using Oracle stored procedures.

In the first example, we call a stored procedure named FIND_ALL_COFFEE_BEVERAGES that does not define any input parameters but that returns a ResultSet.

In Apache Derby, stored procedures are implemented in Java. The following listing shows the method signature:

public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
            throws SQLException {
    ...
}

The following listing shows the corresponding SQL:

CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';

In Spring Integration, you can now call this stored procedure by using, for example, a stored-proc-outbound-gateway, as the following example shows:

<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
                                       data-source="dataSource"
                                       request-channel="findAllProcedureRequestChannel"
                                       expect-single-result="true"
                                       stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
    row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>

In the second example, we call a stored procedure named FIND_COFFEE that has one input parameter. Instead of returning a ResultSet, it uses an output parameter. The following example shows the method signature:

public static void findCoffee(int coffeeId, String[] coffeeDescription)
            throws SQLException {
    ...
}

The following listing shows the corresponding SQL:

CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';

In Spring Integration, you can now call this Stored Procedure by using, for example, a stored-proc-outbound-gateway, as the following example shows:

<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
                                       data-source="dataSource"
                                       request-channel="findCoffeeProcedureRequestChannel"
                                       skip-undeclared-results="true"
                                       stored-procedure-name="FIND_COFFEE"
                                       expect-single-result="true">
    <int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>

JDBC Lock Registry

Version 4.3 introduced the JdbcLockRegistry. Certain components (for example, aggregator and resequencer) use a lock obtained from a LockRegistry instance to ensure that only one thread manipulates a group at a time. The DefaultLockRegistry performs this function within a single component. You can now configure an external lock registry on these components. When used with a shared MessageGroupStore, you can use the JdbcLockRegistry to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.

When a lock is released by a local thread, another local thread can generally acquire the lock immediately. If a lock is released by a thread that uses a different registry instance, it can take up to 100ms to acquire the lock.

The JdbcLockRegistry is based on the LockRepository abstraction, which has a DefaultLockRepository implementation. The database schema scripts are located in the org.springframework.integration.jdbc package, which is divided for the particular RDBMS vendors. For example, the following listing shows the H2 DDL for the lock table:

CREATE TABLE INT_LOCK  (
    LOCK_KEY CHAR(36),
    REGION VARCHAR(100),
    CLIENT_ID CHAR(36),
    CREATED_DATE TIMESTAMP NOT NULL,
    constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);

The INT_ can be changed according to the target database design requirements. Therefore, you must use prefix property on the DefaultLockRepository bean definition.

Sometimes, one application has moved to such a state that it cannot release the distributed lock and remove the particular record in the database. For this purpose, such dead-locks can be expired by the other application on the next locking invocation. The timeToLive (TTL) option on the DefaultLockRepository is provided for this purpose. You may also want to specify CLIENT_ID for the locks stored for a given DefaultLockRepository instance. If so, you can specify the id to be associated with the DefaultLockRepository as a constructor parameter.

Starting with version 5.1.8, the JdbcLockRegistry can be configured with the idleBetweenTries - a Duration to sleep between lock record insert/update executions. By default, it is 100 milliseconds and in some environments non-leaders pollute connections with data source too often.

Starting with version 5.4, the RenewableLockRegistry interface has been introduced and added to JdbcLockRegistry. The renewLock() method must be called during locked process in case of the locked process would be longer than time to live of the lock. So the time to live can be highly reduce and deployments can retake a lost lock quickly.

The lock renewal can be done only if the lock is held by the current thread.

String with version 5.5.6, the JdbcLockRegistry is support automatically clean up cache for JdbcLock in JdbcLockRegistry.locks via JdbcLockRegistry.setCacheCapacity(). See its JavaDocs for more information.

String with version 6.0, the DefaultLockRepository can be supplied with a PlatformTransactionManager instead of relying on the primary bean from the application context.

JDBC Metadata Store

Version 5.0 introduced the JDBC MetadataStore (see Metadata Store) implementation. You can use the JdbcMetadataStore to maintain the metadata state across application restarts. This MetadataStore implementation can be used with adapters such as the following:

To configure these adapters to use the JdbcMetadataStore, declare a Spring bean by using a bean name of metadataStore. The Feed inbound channel adapter and the feed inbound channel adapter both automatically pick up and use the declared JdbcMetadataStore, as the following example shows:

@Bean
public MetadataStore metadataStore(DataSource dataSource) {
    return new JdbcMetadataStore(dataSource);
}

The org.springframework.integration.jdbc package has Database schema scripts for several RDMBS vendors. For example, the following listing shows the H2 DDL for the metadata table:

CREATE TABLE INT_METADATA_STORE  (
	METADATA_KEY VARCHAR(255) NOT NULL,
	METADATA_VALUE VARCHAR(4000),
	REGION VARCHAR(100) NOT NULL,
	constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);

You can change the INT_ prefix to match the target database design requirements. You can also configure JdbcMetadataStore to use the custom prefix.

The JdbcMetadataStore implements ConcurrentMetadataStore, letting it be reliably shared across multiple application instances, where only one instance can store or modify a key’s value. All of these operations are atomic, thanks to transaction guarantees.

Transaction management must use JdbcMetadataStore. Inbound channel adapters can be supplied with a reference to the TransactionManager in the poller configuration. Unlike non-transactional MetadataStore implementations, with JdbcMetadataStore, the entry appears in the target table only after the transaction commits. When a rollback occurs, no entries are added to the INT_METADATA_STORE table.

Since version 5.0.7, you can configure the JdbcMetadataStore with the RDBMS vendor-specific lockHint option for lock-based queries on metadata store entries. By default, it is FOR UPDATE and can be configured with an empty string if the target database does not support row locking functionality. Consult with your vendor for particular and possible hints in the SELECT expression for locking rows before updates.