19. JDBC Support

Spring Integration provides Channel Adapters for receiving and sending messages via database queries. Through those adapters Spring Integration supports not only plain JDBC SQL Queries, but also Stored Procedure and Stored Function calls.

The following JDBC components are available by default:

Furthermore, the Spring Integration JDBC Module also provides a JDBC Message Store

19.1 Inbound Channel Adapter

The main function of an inbound Channel Adapter is to execute a SQL SELECT query and turn the result set as a message. The message payload is the whole result set, expressed as a List, and the types of the items in the list depend on the row-mapping strategy that is used. The default strategy is a generic mapper that just returns a Map for each row in the query result. Optionally, this can be changed by adding a reference to a RowMapper instance (see the Spring JDBC documentation for more detailed information about row mapping).

[Note]Note

If you want to convert rows in the SELECT query result to individual messages you can use a downstream splitter.

The inbound adapter also requires a reference to either a JdbcTemplate instance or a DataSource.

As well as the SELECT statement to generate the messages, the adapter above also has an UPDATE statement that is being used to mark the records as processed so that they don’t show up in the next poll. The update can be parameterized by the list of ids from the original select. This is done through a naming convention by default (a column in the input result set called "id" is translated into a list in the parameter map for the update called "id"). The following example defines an inbound Channel Adapter with an update query and a DataSource reference.

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />
[Note]Note

The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which in this case is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC combined with a convention (projection onto the polled result list) adopted in Spring Integration. The underlying Spring JDBC features limit the available expressions (e.g. most special characters other than period are disallowed), but since the target is usually a list of or an individual object addressable by simple bean paths this isn’t unduly restrictive.

To change the parameter generation strategy you can inject a SqlParameterSourceFactory into the adapter to override the default behavior (the adapter has a sql-parameter-source-factory attribute). Spring Integration provides a ExpressionEvaluatingSqlParameterSourceFactory which will create a SpEL-based parameter source, with the results of the query as the #root object. (If update-per-row is true, the root object is the row). If the same parameter name appears multiple times in the update query, it is evaluated only one time, and its result is cached.

You can also use a parameter source for the select query. In this case, since there is no "result" object to evaluate against, a single parameter source is used each time (rather than using a parameter source factory). Starting with version 4.0, you can use Spring to create a SpEL based parameter source as follows:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

The value in each parameter expression can be any valid SpEL expression. The #root object for the expression evaluation is the constructor argument defined on the parameterSource bean. It is static for all evaluations (in this case, an empty String).

Starting with version 5.0, the ExpressionEvaluatingSqlParameterSourceFactory can be supplied with the sqlParameterTypes to specify the target SQL type for the particular parameter.

Below example provides sql type for the parameters being used in the query.

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>
[Important]Important

Use the createParameterSourceNoCache factory method; otherwise the parameter source will cache the result of the evaluation. Also note that, because caching is disabled, if the same parameter name appears in the select query multiple times, it will be re-evaluated for each occurrence.

19.1.1 Polling and Transactions

The inbound adapter accepts a regular Spring Integration poller as a sub element, so for instance the frequency of the polling can be controlled. A very important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, for example:

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
[Note]Note

If a poller is not explicitly specified, a default value will be used (and as per normal with Spring Integration can be defined as a top level bean).

In this example the database is polled every 1000 milliseconds, and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown, but as long as it is aware of the data source then the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread, and hence the same transaction. Then if any of them fail, the transaction rolls back and the input data is reverted to its original state.

19.1.2 Max-rows-per-poll versus Max-messages-per-poll

The JDBC Inbound Channel Adapter defines an attribute max-rows-per-poll. When you specify the adapter’s Poller, you can also define a property called max-messages-per-poll. While these two attributes look similar, their meaning is quite different.

max-messages-per-poll specifies the number of times the query is executed per polling interval, whereas max-rows-per-poll specifies the number of rows returned for each execution.

Under normal circumstances, you would likely not want to set the Poller’s max-messages-per-poll property when using the JDBC Inbound Channel Adapter. Its default value is 1, which means that the JDBC Inbound Channel Adapter's receive() method is executed exactly once for each poll interval.

Setting the max-messages-per-poll attribute to a larger value means that the query is executed that many times back to back. For more information regarding the max-messages-per-poll attribute, please see Section 4.3.1, “Configuring An Inbound Channel Adapter”.

In contrast, the max-rows-per-poll attribute, if greater than 0, specifies the maximum number of rows that will be used from the query result set, per execution of the receive() method. If the attribute is set to 0, then all rows will be included in the resulting message. If not explicitly set, the attribute defaults to 0.

19.2 Outbound Channel Adapter

The outbound Channel Adapter is the inverse of the inbound: its role is to handle a message and use it to execute a SQL query. The message payload and headers are available by default as input parameters to the query, for instance:

<int-jdbc:outbound-channel-adapter
    query="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
    data-source="dataSource"
    channel="input"/>

In the example above, messages arriving on the channel labelled input have a payload of a map with key foo, so the [] operator dereferences that value from the map. The headers are also accessed as a map.

[Note]Note

The parameters in the query above are bean property expressions on the incoming message (not Spring EL expressions). This behavior is part of the SqlParameterSource which is the default source created by the outbound adapter. Other behavior is possible in the adapter, and requires the user to inject a different SqlParameterSourceFactory.

The outbound adapter requires a reference to either a DataSource or a JdbcTemplate. It can also have a SqlParameterSourceFactory injected to control the binding of each incoming message to a query.

If the input channel is a direct channel, then the outbound adapter runs its query in the same thread, and therefore the same transaction (if there is one) as the sender of the message.

Passing Parameters using SpEL Expressions

A common requirement for most JDBC Channel Adapters is to pass parameters as part of Sql queries or Stored Procedures/Functions. As mentioned above, these parameters are by default bean property expressions, not SpEL expressions. However, if you need to pass SpEL expression as parameters, you must inject a SqlParameterSourceFactory explicitly.

The following example uses a ExpressionEvaluatingSqlParameterSourceFactory to achieve that requirement.

<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
    query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE)     \
    values (:id, :payload, :createdDate)"
    sql-parameter-source-factory="spelSource"/>

<bean id="spelSource"
      class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="parameterExpressions">
        <map>
            <entry key="id"          value="headers['id'].toString()"/>
            <entry key="createdDate" value="new java.util.Date()"/>
            <entry key="payload"     value="payload"/>
        </map>
    </property>
</bean>

For further information, please also see Section 19.5.5, “Defining Parameter Sources”

PreparedStatement Callback

There are some cases when the flexibility and loose-coupling of SqlParameterSourceFactory isn’t enough for the target PreparedStatement or we need to do some low-level JDBC work. The Spring JDBC module provides APIs to configure the execution environment (e.g. ConnectionCallback or PreparedStatementCreator) and manipulation of parameter values (e.g. SqlParameterSource). Or even APIs for low level operations, for example StatementCallback.

Starting with Spring Integration 4.2, the MessagePreparedStatementSetter is available to allow the specification of parameters on the PreparedStatement manually, in the requestMessage context. This class plays exactly the same role as PreparedStatementSetter in the standard Spring JDBC API. Actually it is invoked directly from an inline PreparedStatementSetter implementation, when the JdbcMessageHandler invokes execute on the JdbcTemplate.

This functional interface option is mutually exclusive with sqlParameterSourceFactory and can be used as a more powerful alternative to populate parameters of the PreparedStatement from the requestMessage. For example it is useful when we need to store File data to the DataBase BLOB column in a stream manner:

@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
    JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
            "INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
    jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
        ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
        try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
            ps.setBlob(2, inputStream);
        }
        catch (Exception e) {
            throw new MessageHandlingException(m, e);
        }
        ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
    });
    return jdbcMessageHandler;
}

From the XML configuration perspective, the prepared-statement-setter attribute is available on the <int-jdbc:outbound-channel-adapter> component, to specify a MessagePreparedStatementSetter bean reference.

19.3 Outbound Gateway

The outbound Gateway is like a combination of the outbound and inbound adapters: its role is to handle a message and use it to execute a SQL query and then respond with the result sending it to a reply channel. The message payload and headers are available by default as input parameters to the query, for instance:

<int-jdbc:outbound-gateway
    update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
    request-channel="input" reply-channel="output" data-source="dataSource" />

The result of the above would be to insert a record into the "foos" table and return a message to the output channel indicating the number of rows affected (the payload is a map: {UPDATED=1}).

If the update query is an insert with auto-generated keys, the reply message can be populated with the generated keys by adding keys-generated="true" to the above example (this is not the default because it is not supported by some database platforms). For example:

<int-jdbc:outbound-gateway
    update="insert into foos (status, name) values (0, :payload[foo])"
    request-channel="input" reply-channel="output" data-source="dataSource"
    keys-generated="true"/>

Instead of the update count or the generated keys, you can also provide a select query to execute and generate a reply message from the result (like the inbound adapter), e.g:

<int-jdbc:outbound-gateway
    update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
    query="select * from foos where id=:headers[$id]"
    request-channel="input" reply-channel="output" data-source="dataSource"/>

Since Spring Integration 2.2 the update SQL query is no longer mandatory. You can now solely provide a select query, using either the query attribute or the query sub-element. This is extremely useful if you need to actively retrieve data using e.g. a generic Gateway or a Payload Enricher. The reply message is then generated from the result, like the inbound adapter, and passed to the reply channel.

<int-jdbc:outbound-gateway
    query="select * from foos where id=:headers[id]"
    request-channel="input"
    reply-channel="output"
    data-source="dataSource"/>
[Important]Important

By default the component for the SELECT query returns only one, first row from the cursor. This can be adjusted with the max-rows-per-poll option. Consider to specify max-rows-per-poll="0" if you need to return all the rows from the SELECT.

As with the channel adapters, you can also provide SqlParameterSourceFactory instances for request and reply. The default is the same as for the outbound adapter, so the request message is available as the root of an expression. If keys-generated="true" then the root of the expression is the generated keys (a map if there is only one or a list of maps if multi-valued).

The outbound gateway requires a reference to either a DataSource or a JdbcTemplate. It can also have a SqlParameterSourceFactory injected to control the binding of the incoming message to the query.

Starting with the version 4.2 the request-prepared-statement-setter attribute is available on the <int-jdbc:outbound-gateway> as an alternative to the request-sql-parameter-source-factory. It allows you to specify a MessagePreparedStatementSetter bean reference, which implements more sophisticated PreparedStatement preparation before its execution.

See Section 19.2, “Outbound Channel Adapter” for more information about MessagePreparedStatementSetter.

19.4 JDBC Message Store

Spring Integration provides 2 JDBC specific Message Store implementations. The first one, is the JdbcMessageStore which is suitable to be used in conjunction with Aggregators and the Claim-Check pattern. While it can be used for backing Message Channels as well, you may want to consider using the JdbcChannelMessageStore implementation instead, as it provides a more targeted and scalable implementation.

19.4.1 Initializing the Database

Before starting to use JDBC Message Store components, it is important to provision target data base with the appropriate objects.

Spring Integration ships with some sample scripts that can be used to initialize a database. In the spring-integration-jdbc JAR file you can find scripts in the org.springframework.integration.jdbc package: there is a create and a drop script example for a range of common database platforms. A common way to use these scripts is to reference them in a Spring JDBC data source initializer. Note that the scripts are provided as samples or specifications of the the required table and column names. You may find that you need to enhance them for production use (e.g. with index declarations).

19.4.2 The Generic JDBC Message Store

The JDBC module provides an implementation of the Spring Integration MessageStore (important in the Claim Check pattern) and MessageGroupStore (important in stateful patterns like Aggregator) backed by a database. Both interfaces are implemented by the JdbcMessageStore, and there is also support for configuring store instances in XML. For example:

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

A JdbcTemplate can be specified instead of a DataSource.

Other optional attributes are show in the next example:

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

Here we have specified a LobHandler for dealing with messages as large objects (e.g. often necessary if using Oracle) and a prefix for the table names in the queries generated by the store. The table name prefix defaults to INT_.

19.4.3 Backing Message Channels

If you intend backing Message Channels using JDBC, it is recommended to use the provided JdbcChannelMessageStore implementation instead. It can only be used in conjunction with Message Channels.

Supported Databases

The JdbcChannelMessageStore uses database specific SQL queries to retrieve messages from the database. Therefore, users must set the ChannelMessageStoreQueryProvider property on the JdbcChannelMessageStore. This channelMessageStoreQueryProvider provides the SQL queries and Spring Integration provides support for the following relational databases:

  • PostgreSQL
  • HSQLDB
  • MySQL
  • Oracle
  • Derby
  • H2
  • SqlServer
  • Sybase
  • DB2

If your database is not listed, you can easily extend the AbstractChannelMessageStoreQueryProvider class and provide your own custom queries.

Since version 4.0, the MESSAGE_SEQUENCE column has been added to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.

Since version 5.0, by overloading ChannelMessageStorePreparedStatementSetter class you can provide custom implementation for message insertion in the JdbcChannelMessageStore. It might be different columns or table structure or serialization strategy. For example, instead of default serialization to byte[], we can store its structure in JSON string.

Below example uses the default implementation of setValues to store common columns and overrides the behavior just to store the message payload as varchar.

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    public JsonPreparedStatementSetter() {
        super();
    }

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}
[Important]Important

Generally it is not recommended to use a relational database for the purpose of queuing. Instead, if possible, consider using either JMS or AMQP backed channels instead. For further reference please see the following resources:

Concurrent Polling

When polling a Message Channel, you have the option to configure the associated Poller with a TaskExecutor reference.

[Important]Important

Keep in mind, though, that if you use a JDBC backed Message Channel and you are planning on polling the channel and consequently the message store transactionally with multiple threads, you should ensure that you use a relational database that supports Multiversion Concurrency Control (MVCC). Otherwise, locking may be an issue and the performance, when using multiple threads, may not materialize as expected. For example Apache Derby is problematic in that regard.

To achieve better JDBC queue throughput, and avoid issues when different threads may poll the same Message from the queue, it is important to set the usingIdCache property of JdbcChannelMessageStore to true when using databases that do not support MVCC:

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

Priority Channel

Starting with version 4.0, the JdbcChannelMessageStore implements PriorityCapableChannelMessageStore and provides the priorityEnabled option allowing it to be used as a message-store reference for priority-queue s. For this purpose, the INT_CHANNEL_MESSAGE has a MESSAGE_PRIORITY column to store the value of PRIORITY Message header. In addition, a new MESSAGE_SEQUENCE column is also provided to achieve a robust first-in-first-out (FIFO) polling mechanism, even when multiple messages are stored with the same priority in the same millisecond. Messages are polled (selected) from the database with order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE.

[Note]Note

It’s not recommended to use the same JdbcChannelMessageStore bean for priority and non-priority queue channel, because priorityEnabled option applies to the entire store and proper FIFO queue semantics will not be retained for the queue channel. However the same INT_CHANNEL_MESSAGE table, and even region, can be used for both JdbcChannelMessageStore types. To configure that scenario, simply extend one message store bean from the other:

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

19.4.4 Partitioning a Message Store

It is common to use a JdbcMessageStore as a global store for a group of applications, or nodes in the same application. To provide some protection against name clashes, and to give control over the database meta-data configuration, the message store allows the tables to be partitioned in two ways. One is to use separate table names, by changing the prefix as described above, and the other is to specify a "region" name for partitioning data within a single table. An important use case for this is when the MessageStore is managing persistent queues backing a Spring Integration Message Channel. The message data for a persistent channel is keyed in the store on the channel name, so if the channel names are not globally unique then there is the danger of channels picking up data that was not intended for them. To avoid this, the message store region can be used to keep data separate for different physical channels that happen to have the same logical name.

19.5 Stored Procedures

In certain situations plain JDBC support is not sufficient. Maybe you deal with legacy relational database schemas or you have complex data processing needs, but ultimately you have to use Stored Procedures or Stored Functions. Since Spring Integration 2.1, we provide three components in order to execute Stored Procedures or Stored Functions:

  • Stored Procedures Inbound Channel Adapter
  • Stored Procedures Outbound Channel Adapter
  • Stored Procedures Outbound Gateway

19.5.1 Supported Databases

In order to enable calls to Stored Procedures and Stored Functions, the Stored Procedure components use the org.springframework.jdbc.core.simple.SimpleJdbcCall class. Consequently, the following databases are fully supported for executing Stored Procedures:

  • Apache Derby
  • DB2
  • MySQL
  • Microsoft SQL Server
  • Oracle
  • PostgreSQL
  • Sybase

If you want to execute Stored Functions instead, the following databases are fully supported:

  • MySQL
  • Microsoft SQL Server
  • Oracle
  • PostgreSQL
[Note]Note

Even though your particular database may not be fully supported, chances are, that you can use the Stored Procedure Spring Integration components quite successfully anyway, provided your RDBMS supports Stored Procedures or Functions.

As a matter of fact, some of the provided integration tests use the H2 database. Nevertheless, it is very important to thoroughly test those usage scenarios.

19.5.2 Configuration

The Stored Procedure components provide full XML Namespace support and configuring the components is similar as for the general purpose JDBC components discussed earlier.

19.5.3 Common Configuration Attributes

Certain configuration parameters are shared among all Stored Procedure components and are described below:

auto-startup

Lifecycle attribute signaling if this component should be started during Application Context startup. Defaults to true. Optional.

data-source

Reference to a javax.sql.DataSource, which is used to access the database. Required.

id

Identifies the underlying Spring bean definition, which is an instance of either EventDrivenConsumer or PollingConsumer, depending on whether the Outbound Channel Adapter’s channel attribute references a SubscribableChannel or a PollableChannel. Optional.

ignore-column-meta-data

For fully supported databases, the underlying SimpleJdbcCall class can automatically retrieve the parameter information for the to be invoked Stored Procedure or Function from the JDBC Meta-data.

However, if the used database does not support meta data lookups or if you like to provide customized parameter definitions, this flag can be set to true. It defaults to false. Optional.

is-function

If true, a SQL Function is called. In that case the stored-procedure-name or stored-procedure-name-expression attributes define the name of the called function. Defaults to false. Optional.

stored-procedure-name

The attribute specifies the name of the stored procedure. If the is-function attribute is set to true, this attribute specifies the function name instead. Either this property or stored-procedure-name-expression must be specified.

stored-procedure-name-expression

This attribute specifies the name of the stored procedure using a SpEL expression. Using SpEL you have access to the full message (if available), including its headers and payload. You can use this attribute to invoke different Stored Procedures at runtime. For example, you can provide Stored Procedure names that you would like to execute as a Message Header. The expression must resolve to a String.

If the is-function attribute is set to true, this attribute specifies a Stored Function. Either this property or stored-procedure-name must be specified.

jdbc-call-operations-cache-size

Defines the maximum number of cached SimpleJdbcCallOperations instances. Basically, for each Stored Procedure Name a new SimpleJdbcCallOperations instance is created that in return is being cached.

[Note]Note

The stored-procedure-name-expression attribute and the jdbc-call-operations-cache-size were added with Spring Integration 2.2.

The default cache size is 10. A value of 0 disables caching. Negative values are not permitted.

If you enable JMX, statistical information about the jdbc-call-operations-cache is exposed as MBean. Please see Section 10.2.7, “MBean Exporter” for more information.

sql-parameter-source-factory (Not available for the Stored Procedure Inbound Channel Adapter.)

Reference to a SqlParameterSourceFactory. By default bean properties of the passed in Message payload will be used as a source for the Stored Procedure’s input parameters using a BeanPropertySqlParameterSourceFactory.

This may be sufficient for basic use cases. For more sophisticated options, consider passing in one or more ProcedureParameter. Please also refer to Section 19.5.5, “Defining Parameter Sources”. Optional.

use-payload-as-parameter-source (Not available for the Stored Procedure Inbound Channel Adapter.)

If set to true, the payload of the Message will be used as a source for providing parameters. If false, however, the entire Message will be available as a source for parameters.

If no Procedure Parameters are passed in, this property will default to true. This means that using a default BeanPropertySqlParameterSourceFactory the bean properties of the payload will be used as a source for parameter values for the to-be-executed Stored Procedure or Stored Function.

However, if Procedure Parameters are passed in, then this property will by default evaluate to false. ProcedureParameter allow for SpEL Expressions to be provided and therefore it is highly beneficial to have access to the entire Message. The property is set on the underlying StoredProcExecutor. Optional.

19.5.4 Common Configuration Sub-Elements

The Stored Procedure components share a common set of sub-elements to define and pass parameters to Stored Procedures or Functions. The following elements are available:

  • parameter
  • returning-resultset
  • sql-parameter-definition
  • poller

parameter

Provides a mechanism to provide Stored Procedure parameters. Parameters can be either static or provided using a SpEL Expressions. Optional.

<int-jdbc:parameter name=""     1
                    type=""     2
                    value=""/>  3

<int-jdbc:parameter name=""
                    expression=""/> 4

1

The name of the parameter to be passed into the Stored Procedure or Stored Function. Required.

2

This attribute specifies the type of the value. If nothing is provided this attribute will default to java.lang.String. This attribute is only used when the value attribute is used. Optional.

3

The value of the parameter. You have to provider either this attribute or the expression attribute must be provided instead. Optional.

4

Instead of the value attribute, you can also specify a SpEL expression for passing the value of the parameter. If you specify the expression the value attribute is not allowed. Optional.

returning-resultset

Stored Procedures may return multiple result sets. By setting one or more returning-resultset elements, you can specify RowMappers in order to convert each returned ResultSet to meaningful objects. Optional.

<int-jdbc:returning-resultset name="" row-mapper="" />

sql-parameter-definition

If you are using a database that is fully supported, you typically don’t have to specify the Stored Procedure parameter definitions. Instead, those parameters can be automatically derived from the JDBC Meta-data. However, if you are using databases that are not fully supported, you must set those parameters explicitly using the sql-parameter-definition sub-element.

You can also choose to turn off any processing of parameter meta data information obtained via JDBC using the ignore-column-meta-data attribute.

<int-jdbc:sql-parameter-definition
                                   name=""                           1
                                   direction="IN"                    2
                                   type="STRING"                     3
                                   scale="5"                         4
                                   type-name="FOO_STRUCT"            5
                                   return-type="fooSqlReturnType"/>  6

1

Specifies the name of the SQL parameter. Required.

2

Specifies the direction of the SQL parameter definition. Defaults to IN. Valid values are: IN, OUT and INOUT. If your procedure is returning ResultSets, please use the returning-resultset element. Optional.

3

The SQL type used for this SQL parameter definition. Will translate into the integer value as defined by java.sql.Types. Alternatively you can provide the integer value as well. If this attribute is not explicitly set, then it will default to VARCHAR. Optional.

4

The scale of the SQL parameter. Only used for numeric and decimal parameters. Optional.

5

The typeName for types that are user-named like: STRUCT, DISTINCT, JAVA_OBJECT, named array types. This attribute is mutually exclusive with the scale attribute. Optional.

6

The reference to a custom value handler for complex types. An implementation of SqlReturnType. This attribute is mutually exclusive with the scale attribute and is applicable for OUT(INOUT)-parameters only. Optional.

poller

Allows you to configure a Message Poller if this endpoint is a PollingConsumer. Optional.

19.5.5 Defining Parameter Sources

Parameter Sources govern the techniques of retrieving and mapping the Spring Integration Message properties to the relevant Stored Procedure input parameters. The Stored Procedure components follow certain rules.

By default bean properties of the passed in Message payload will be used as a source for the Stored Procedure’s input parameters. In that case a BeanPropertySqlParameterSourceFactory will be used. This may be sufficient for basic use cases. The following example illustrates that default behavior.

[Important]Important

Please be aware that for the "automatic" lookup of bean properties using the BeanPropertySqlParameterSourceFactory to work, your bean properties must be defined in lower case. This is due to the fact that in org.springframework.jdbc.core.metadata.CallMetaDataContext (method matchInParameterValuesWithCallParameters()), the retrieved Stored Procedure parameter declarations are converted to lower case. As a result, if you have camel-case bean properties such as "lastName", the lookup will fail. In that case, please provide an explicit ProcedureParameter.

Let’s assume we have a payload that consists of a simple bean with the following three properties: id, name and description. Furthermore, we have a simplistic Stored Procedure called INSERT_COFFEE that accepts three input parameters: id, name and description. We also use a fully supported database. In that case the following configuration for a Stored Procedure Outbound Adapter will be sufficient:

<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
    channel="insertCoffeeProcedureRequestChannel"
    stored-procedure-name="INSERT_COFFEE"/>

For more sophisticated options consider passing in one or more ProcedureParameter.

If you do provide ProcedureParameter explicitly, then as default an ExpressionEvaluatingSqlParameterSourceFactory will be used for parameter processing in order to enable the full power of SpEL expressions.

Furthermore, if you need even more control over how parameters are retrieved, consider passing in a custom implementation of a SqlParameterSourceFactory using the sql-parameter-source-factory attribute.

19.5.6 Stored Procedure Inbound Channel Adapter

<int-jdbc:stored-proc-inbound-channel-adapter
                                   channel=""                                    1
                                   stored-procedure-name=""
                                   data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   skip-undeclared-results=""                    2
                                   return-value-required="false"                 3
    <int:poller/>
    <int-jdbc:sql-parameter-definition name="" direction="IN"
                                               type="STRING"
                                               scale=""/>
    <int-jdbc:parameter name="" type="" value=""/>
    <int-jdbc:parameter name="" expression=""/>
    <int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>

1

Channel to which polled messages will be sent. If the stored procedure or function does not return any data, the payload of the Message will be Null. Required.

2

If this attribute is set to true, all results from a stored procedure call that do not have a corresponding SqlOutParameter declaration are bypassed. For example, stored procedures can return an update count value, even though your stored procedure declared only a single result parameter. The exact behavior depends on the database implementation. The value is set on the underlying JdbcTemplate. Few developers will probably ever want to process update counts, thus the value defaults to true. Optional.

3

Indicates whether this procedure’s return value should be included. Since Spring Integration 3.0. Optional.

19.5.7 Stored Procedure Outbound Channel Adapter

<int-jdbc:stored-proc-outbound-channel-adapter channel=""                        1
                                               stored-procedure-name=""
                                               data-source=""
                                               auto-startup="true"
                                               id=""
                                               ignore-column-meta-data="false"
                                               order=""                          2
                                               sql-parameter-source-factory=""
                                               use-payload-as-parameter-source="">
    <int:poller fixed-rate=""/>
    <int-jdbc:sql-parameter-definition name=""/>
    <int-jdbc:parameter name=""/>

</int-jdbc:stored-proc-outbound-channel-adapter>

1

The receiving Message Channel of this endpoint. Required.

2

Specifies the order for invocation when this endpoint is connected as a subscriber to a channel. This is particularly relevant when that channel is using a failover dispatching strategy. It has no effect when this endpoint itself is a Polling Consumer for a channel with a queue. Optional.

19.5.8 Stored Procedure Outbound Gateway

<int-jdbc:stored-proc-outbound-gateway request-channel=""                        1
                                       stored-procedure-name=""
                                       data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   order=""
                                   reply-channel=""                              2
                                   reply-timeout=""                              3
                                   return-value-required="false"                 4
                                   skip-undeclared-results=""                    5
                                   sql-parameter-source-factory=""
                                   use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
                                   type=""
                                   scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />

1

The receiving Message Channel of this endpoint. Required.

2

Message Channel to which replies should be sent, after receiving the database response. Optional.

3

Allows you to specify how long this gateway will wait for the reply message to be sent successfully before throwing an exception. Keep in mind that when sending to a DirectChannel, the invocation will occur in the sender’s thread so the failing of the send operation may be caused by other components further downstream. By default the Gateway will wait indefinitely. The value is specified in milliseconds. Optional.

4

Indicates whether this procedure’s return value should be included. Optional.

5

If the skip-undeclared-results attribute is set to true, then all results from a stored procedure call that don’t have a corresponding SqlOutParameter declaration will be bypassed. E.g. Stored Procedures may return an update count value, even though your Stored Procedure only declared a single result parameter. The exact behavior depends on the used database. The value is set on the underlying JdbcTemplate. Few developers will probably ever want to process update counts, thus the value defaults to true. Optional.

19.5.9 Examples

In the following two examples we call Apache Derby Stored Procedures. The first procedure will call a Stored Procedure that returns a ResultSet, and using a RowMapper the data is converted into a domain object, which then becomes the Spring Integration message payload.

In the second sample we call a Stored Procedure that uses Output Parameters instead, in order to return data.

[Note]Note

Please have a look at the Spring Integration Samples project, located at null

The project contains the Apache Derby example referenced here, as well as instruction on how to run it. The Spring Integration Samples project also provides an example using Oracle Stored Procedures.

In the first example, we call a Stored Procedure named FIND_ALL_COFFEE_BEVERAGES that does not define any input parameters but which returns a ResultSet.

In Apache Derby, Stored Procedures are implemented using Java. Here is the method signature followed by the corresponding Sql:

public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
            throws SQLException {
    ...
}
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';

In Spring Integration, you can now call this Stored Procedure using e.g. a stored-proc-outbound-gateway

<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
                                       data-source="dataSource"
                                       request-channel="findAllProcedureRequestChannel"
                                       expect-single-result="true"
                                       stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
    row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>

In the second example, we call a Stored Procedure named FIND_COFFEE that has one input parameter. Instead of returning a ResultSet, an output parameter is used:

public static void findCoffee(int coffeeId, String[] coffeeDescription)
            throws SQLException {
    ...
}
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';

In Spring Integration, you can now call this Stored Procedure using e.g. a stored-proc-outbound-gateway

<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
                                       data-source="dataSource"
                                       request-channel="findCoffeeProcedureRequestChannel"
                                       skip-undeclared-results="true"
                                       stored-procedure-name="FIND_COFFEE"
                                       expect-single-result="true">
    <int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>

19.6 JDBC Lock Registry

Starting with version 4.3, the JdbcLockRegistry is available. Certain components (for example aggregator and resequencer) use a lock obtained from a LockRegistry instance to ensure that only one thread is manipulating a group at a time. The DefaultLockRegistry performs this function within a single component; you can now configure an external lock registry on these components. When used with a shared MessageGroupStore, the JdbcLockRegistry can be use to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.

When a lock is released by a local thread, another local thread will generally be able to acquire the lock immediately. If a lock is released by a thread using a different registry instance, it can take up to 100ms to acquire the lock.

The JdbcLockRegistry is based on the LockRepository abstraction, where a DefaultLockRepository implementation is present. The data base schema scripts are located in the org.springframework.integration.jdbc package divided to the particular RDBMS vendors. For example the H2 DDL for lock table looks like:

CREATE TABLE INT_LOCK  (
    LOCK_KEY CHAR(36),
    REGION VARCHAR(100),
    CLIENT_ID CHAR(36),
    CREATED_DATE TIMESTAMP NOT NULL,
    constraint LOCK_PK primary key (LOCK_KEY, REGION)
);

The INT_ can be changed according to the target data base design requirements. Therefore prefix property must be used on the DefaultLockRepository bean definition.

Sometimes it happens that one application has moved to the state when it can’t release distributed lock - remove the particular record in the data base. For this purpose such dead locks can be expired by the other application on the next locking invocation. The timeToLive (TTL) option on the DefaultLockRepository is provided for this purpose. The user may also want to specify CLIENT_ID for the locks stored for a given DefaultLockRepository instance. In this case you can specify the id to be associated with the DefaultLockRepository as a constructor parameter.

19.7 JDBC Metadata Store

Starting with version 5.0, the JDBC MetadataStore (Section 10.5, “Metadata Store”) implementation is available. The JdbcMetadataStore can be used to maintain metadata state across application restarts. This MetadataStore implementation can be used with adapters such as:

In order to configure these adapters to use the JdbcMetadataStore, simply declare a Spring bean using the bean name metadataStore. The Twitter Inbound Channel Adapter and the Feed Inbound Channel Adapter will both automatically pick up and use the declared JdbcMetadataStore:

@Bean
public MetadataStore metadataStore(DataSource dataSource) {
    return new JdbcMetadataStore(dataSource);
}

Data base schema scripts for several RDMBS vendors are located in the org.springframework.integration.jdbc package. For example the H2 DDL for metadata table looks like:

CREATE TABLE INT_METADATA_STORE  (
	METADATA_KEY VARCHAR(255) NOT NULL,
	METADATA_VALUE VARCHAR(4000),
	REGION VARCHAR(100) NOT NULL,
	constraint METADATA_STORE primary key (METADATA_KEY, REGION)
);

The INT_ prefix can be changed according to the target data base design requirements and the JdbcMetadataStore can be configured to use the custom prefix.

The JdbcMetadataStore implements ConcurrentMetadataStore, allowing it to be reliably shared across multiple application instances where only one instance will be allowed to store or modify a key’s value. All of these operations are atomic via transaction guarantees.

Transaction management is required to use JdbcMetadataStore. Inbound Channel Adapters can be supplied with a reference to the TransactionManager in the poller configuration. Unlike non-transactional MetadataStore implementations, with JdbcMetadataStore, the entry appears in the target table only after the transaction commits. When a rollback occurs, no entries are added to the INT_METADATA_STORE table.

Since version 5.0.7, the JdbcMetadataStore can be configured with the RDBMS vendor-specific lockHint option for lock-based queries on metadata store entries. It is FOR UPDATE by default and can be configured with an empty string, if the target data base doesn’t support row locking functionality. Please, consult with your vendor for particular possible hint in the SELECT expression for locking rows before updates.