JDBC Support
Spring Integration provides channel adapters for receiving and sending messages by using database queries. Through those adapters, Spring Integration supports not only plain JDBC SQL queries but also stored procedure and stored function calls.
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
<version>6.2.0-M1</version>
</dependency>
compile "org.springframework.integration:spring-integration-jdbc:6.2.0-M1"
By default, the following JDBC components are available:
The Spring Integration JDBC Module also provides a JDBC Message Store.
Inbound Channel Adapter
The main function of an inbound channel adapter is to execute a SQL SELECT
query and turn the result set into a message.
The message payload is the whole result set (expressed as a List
), and the types of the items in the list depend on the row-mapping strategy.
The default strategy is a generic mapper that returns a Map
for each row in the query result.
Optionally, you can change this by adding a reference to a RowMapper
instance (see the Spring JDBC documentation for more detailed information about row mapping).
If you want to convert rows in the SELECT query result to individual messages, you can use a downstream splitter.
|
The inbound adapter also requires a reference to either a JdbcTemplate
instance or a DataSource
.
As well as the SELECT
statement to generate the messages, the adapter also has an UPDATE
statement that marks the records as processed so that they do not show up in the next poll.
The update can be parameterized by the list of IDs from the original select.
By default, this is done through a naming convention (a column in the input result set called id
is translated into a list in the parameter map for the update called id
).
The following example defines an inbound channel adapter with an update query and a DataSource
reference.
<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
channel="target" data-source="dataSource"
update="update item set status=10 where id in (:id)" />
The parameters in the update query are specified with a colon (: ) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set).
This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration.
The underlying Spring JDBC features limit the available expressions (for example, most special characters other than a period are disallowed), but since the target is usually a list of objects (possibly a list of one) that are addressable by bean paths this is not unduly restrictive.
|
To change the parameter generation strategy, you can inject a SqlParameterSourceFactory
into the adapter to override the default behavior (the adapter has a sql-parameter-source-factory
attribute).
Spring Integration provides ExpressionEvaluatingSqlParameterSourceFactory
, which creates a SpEL-based parameter source, with the results of the query as the #root
object.
(If update-per-row
is true, the root object is the row).
If the same parameter name appears multiple times in the update query, it is evaluated only once, and its result is cached.
You can also use a parameter source for the select query. In this case, since there is no “result” object to evaluate against, a single parameter source is used each time (rather than using a parameter source factory). Starting with version 4.0, you can use Spring to create a SpEL based parameter source, as the following example shows:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="status" value="@statusBean.which()" />
</map>
</property>
</bean>
<bean id="statusBean" class="foo.StatusDetermination" />
The value
in each parameter expression can be any valid SpEL expression.
The #root
object for the expression evaluation is the constructor argument defined on the parameterSource
bean.
It is static for all evaluations (in the preceding example, an empty String
).
Starting with version 5.0, you ca supply ExpressionEvaluatingSqlParameterSourceFactory
with sqlParameterTypes
to specify the target SQL type for the particular parameter.
The following example provides SQL types for the parameters being used in the query:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="sqlParameterTypes">
<map>
<entry key="status" value="#{ T(java.sql.Types).BINARY}" />
</map>
</property>
</bean>
Use the createParameterSourceNoCache factory method.
Otherwise, the parameter source caches the result of the evaluation.
Also note that, because caching is disabled, if the same parameter name appears in the select query multiple times, it is re-evaluated for each occurrence.
|
Polling and Transactions
The inbound adapter accepts a regular Spring Integration poller as a child element. Consequently, the frequency of the polling can be controlled (among other uses). An important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, as the following example shows:
<int-jdbc:inbound-channel-adapter query="..."
channel="target" data-source="dataSource" update="...">
<int:poller fixed-rate="1000">
<int:transactional/>
</int:poller>
</int-jdbc:inbound-channel-adapter>
If you do not explicitly specify a poller, a default value is used. As is normal with Spring Integration, it can be defined as a top-level bean). |
In the preceding example, the database is polled every 1000 milliseconds (or once a second), and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown. However, as long as it is aware of the data source, the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.
max-rows
Versus max-messages-per-poll
The JDBC inbound channel adapter defines an attribute called max-rows
.
When you specify the adapter’s poller, you can also define a property called max-messages-per-poll
.
While these two attributes look similar, their meaning is quite different.
max-messages-per-poll
specifies the number of times the query is executed per polling interval, whereas max-rows
specifies the number of rows returned for each execution.
Under normal circumstances, you would likely not want to set the poller’s max-messages-per-poll
property when you use the JDBC inbound channel adapter.
Its default value is 1
, which means that the JDBC inbound channel adapter’s receive()
method is executed exactly once for each poll interval.
Setting the max-messages-per-poll
attribute to a larger value means that the query is executed that many times back to back.
For more information regarding the max-messages-per-poll
attribute, see Configuring An Inbound Channel Adapter.
In contrast, the max-rows
attribute, if greater than 0
, specifies the maximum number of rows to be used from the query result set created by the receive()
method.
If the attribute is set to 0
, all rows are included in the resulting message.
The attribute defaults to 0
.
It is recommended to use result set limiting via vendor-specific query options, for example MySQL LIMIT or SQL Server TOP or Oracle’s ROWNUM .
See the particular vendor documentation for more information.
|
Outbound Channel Adapter
The outbound channel adapter is the inverse of the inbound: its role is to handle a message and use it to execute a SQL query. By default, the message payload and headers are available as input parameters to the query, as the following example shows:
<int-jdbc:outbound-channel-adapter
query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])"
data-source="dataSource"
channel="input"/>
In the preceding example, messages arriving at the channel labelled input
have a payload of a map with a key of something
, so the []
operator dereferences that value from the map.
The headers are also accessed as a map.
The parameters in the preceding query are bean property expressions on the incoming message (not SpEL expressions).
This behavior is part of the SqlParameterSource , which is the default source created by the outbound adapter.
You can inject a different SqlParameterSourceFactory to get different behavior.
|
The outbound adapter requires a reference to either a DataSource
or a JdbcTemplate
.
You can also inject a SqlParameterSourceFactory
to control the binding of each incoming message to a query.
If the input channel is a direct channel, the outbound adapter runs its query in the same thread and, therefore, the same transaction (if there is one) as the sender of the message.
Passing Parameters by Using SpEL Expressions
A common requirement for most JDBC channel adapters is to pass parameters as part of SQL queries or stored procedures or functions.
As mentioned earlier, these parameters are by default bean property expressions, not SpEL expressions.
However, if you need to pass SpEL expression as parameters, you must explicitly inject a SqlParameterSourceFactory
.
The following example uses a ExpressionEvaluatingSqlParameterSourceFactory
to achieve that requirement:
<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) values (:id, :payload, :createdDate)"
sql-parameter-source-factory="spelSource"/>
<bean id="spelSource"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="id" value="headers['id'].toString()"/>
<entry key="createdDate" value="new java.util.Date()"/>
<entry key="payload" value="payload"/>
</map>
</property>
</bean>
For further information, see Defining Parameter Sources.
Using the PreparedStatement
Callback
Sometimes, the flexibility and loose-coupling of SqlParameterSourceFactory
does not do what we need for the target PreparedStatement
or we need to do some low-level JDBC work.
The Spring JDBC module provides APIs to configure the execution environment (such as ConnectionCallback
or PreparedStatementCreator
) and manipulate parameter values (such as SqlParameterSource
).
It can even access APIs for low-level operations, such as StatementCallback
.
Starting with Spring Integration 4.2, MessagePreparedStatementSetter
allows the specification of parameters on the PreparedStatement
manually, in the requestMessage
context.
This class plays exactly the same role as PreparedStatementSetter
in the standard Spring JDBC API.
Actually, it is invoked directly from an inline PreparedStatementSetter
implementation when the JdbcMessageHandler
invokes execute
on the JdbcTemplate
.
This functional interface option is mutually exclusive with sqlParameterSourceFactory
and can be used as a more powerful alternative to populate parameters of the PreparedStatement
from the requestMessage
.
For example, it is useful when we need to store File
data to the DataBase BLOB
column in a streaming manner.
The following example shows how to do so:
@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
"INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
ps.setBlob(2, inputStream);
}
catch (Exception e) {
throw new MessageHandlingException(m, e);
}
ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
});
return jdbcMessageHandler;
}
From the XML configuration perspective, the prepared-statement-setter
attribute is available on the <int-jdbc:outbound-channel-adapter>
component.
It lets you specify a MessagePreparedStatementSetter
bean reference.
Batch Update
Starting with version 5.1, the JdbcMessageHandler
performs a JdbcOperations.batchUpdate()
if the payload of the request message is an Iterable
instance.
Each element of the Iterable
is wrapped to a Message
with the headers from the request message if such an element is not a Message
already.
In the case of regular SqlParameterSourceFactory
-based configuration these messages are used to build an SqlParameterSource[]
for an argument used in the mentioned JdbcOperations.batchUpdate()
function.
When a MessagePreparedStatementSetter
configuration is applied, a BatchPreparedStatementSetter
variant is used to iterate over those messages for each item and the provided MessagePreparedStatementSetter
is called against them.
The batch update is not supported when keysGenerated
mode is selected.
Outbound Gateway
The outbound gateway is like a combination of the outbound and inbound adapters: Its role is to handle a message and use it to execute a SQL query and then respond with the result by sending it to a reply channel. By default, the message payload and headers are available as input parameters to the query, as the following example shows:
<int-jdbc:outbound-gateway
update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])"
request-channel="input" reply-channel="output" data-source="dataSource" />
The result of the preceding example is to insert a record into the mythings
table and return a message that indicates the number of rows affected (the payload is a map: {UPDATED=1}
) to the output channel .
If the update query is an insert with auto-generated keys, you can populate the reply message with the generated keys by adding keys-generated="true"
to the preceding example (this is not the default because it is not supported by some database platforms).
The following example shows the changed configuration:
<int-jdbc:outbound-gateway
update="insert into mythings (status, name) values (0, :payload[thing])"
request-channel="input" reply-channel="output" data-source="dataSource"
keys-generated="true"/>
Instead of the update count or the generated keys, you can also provide a select query to execute and generate a reply message from the result (such as the inbound adapter), as the following example shows:
<int-jdbc:outbound-gateway
update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
query="select * from foos where id=:headers[$id]"
request-channel="input" reply-channel="output" data-source="dataSource"/>
Since Spring Integration 2.2, the update SQL query is no longer mandatory.
You can now provide only a select query, by using either the query
attribute or the query
element.
This is extremely useful if you need to actively retrieve data by using, for example, a generic gateway or a payload enricher.
The reply message is then generated from the result (similar to how the inbound adapter works) and passed to the reply channel.
The following example show to use the query
attribute:
<int-jdbc:outbound-gateway
query="select * from foos where id=:headers[id]"
request-channel="input"
reply-channel="output"
data-source="dataSource"/>
By default, the component for the |
As with the channel adapters, you can also provide SqlParameterSourceFactory
instances for request and reply.
The default is the same as for the outbound adapter, so the request message is available as the root of an expression.
If keys-generated="true"
, the root of the expression is the generated keys (a map if there is only one or a list of maps if multi-valued).
The outbound gateway requires a reference to either a DataSource
or a JdbcTemplate
.
It can also have a SqlParameterSourceFactory
injected to control the binding of the incoming message to the query.
Starting with the version 4.2, the request-prepared-statement-setter
attribute is available on the <int-jdbc:outbound-gateway>
as an alternative to request-sql-parameter-source-factory
.
It lets you specify a MessagePreparedStatementSetter
bean reference, which implements more sophisticated PreparedStatement
preparation before its execution.
Starting with the version 6.0, the JdbcOutboundGateway
returns an empty list result as is instead of converting it to null
as it was before with the meaning "no reply".
This caused an extra configuration in applications where handling of empty lists is a part of downstream logic.
See Splitter Discard Channel for possible empty list handling option.
See Outbound Channel Adapter for more information about MessagePreparedStatementSetter
.
JDBC Message Store
Spring Integration provides two JDBC specific message store implementations.
The JdbcMessageStore
is suitable for use with aggregators and the claim check pattern.
The JdbcChannelMessageStore
implementation provides a more targeted and scalable implementation specifically for message channel.
Note that you can use a JdbcMessageStore
to back a message channel, JdbcChannelMessageStore
is optimized for that purpose.
Starting with versions 5.0.11, 5.1.2, the indexes for the JdbcChannelMessageStore have been optimized.
If you have large message groups in such a store, you may wish to alter the indexes.
Furthermore, the index for PriorityChannel is commented out because it is not needed unless you are using such channels backed by JDBC.
|
When using the OracleChannelMessageStoreQueryProvider , the priority channel index must be added because it is included in a hint in the query.
|
Initializing the Database
Before starting to use JDBC message store components, you should provision a target database with the appropriate objects.
Spring Integration ships with some sample scripts that can be used to initialize a database.
In the spring-integration-jdbc
JAR file, you can find scripts in the org.springframework.integration.jdbc
package.
It provides an example create and an example drop script for a range of common database platforms.
A common way to use these scripts is to reference them in a Spring JDBC data source initializer.
Note that the scripts are provided as samples and as specifications of the required table and column names.
You may find that you need to enhance them for production use (for, example, by adding index declarations).
The Generic JDBC Message Store
The JDBC module provides an implementation of the Spring Integration MessageStore
(important in the claim check pattern) and MessageGroupStore
(important in stateful patterns such as an aggregator) backed by a database.
Both interfaces are implemented by the JdbcMessageStore
, and there is support for configuring store instances in XML, as the following example shows:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
You can specify a JdbcTemplate
instead of a DataSource
.
The following example shows some other optional attributes:
<int-jdbc:message-store id="messageStore" data-source="dataSource"
lob-handler="lobHandler" table-prefix="MY_INT_"/>
In the preceding example, we have specified a LobHandler
for dealing with messages as large objects (which is often necessary for Oracle) and a prefix for the table names in the queries generated by the store.
The table name prefix defaults to INT_
.
Backing Message Channels
If you intend to back message channels with JDBC, we recommend using the JdbcChannelMessageStore
implementation.
It works only in conjunction with Message Channels.
Supported Databases
The JdbcChannelMessageStore
uses database-specific SQL queries to retrieve messages from the database.
Therefore, you must set the ChannelMessageStoreQueryProvider
property on the JdbcChannelMessageStore
.
This channelMessageStoreQueryProvider
provides the SQL queries for the particular database you specify.
Spring Integration provides support for the following relational databases:
-
PostgreSQL
-
HSQLDB
-
MySQL
-
Oracle
-
Derby
-
H2
-
SqlServer
-
Sybase
-
DB2
If your database is not listed, you can extend the AbstractChannelMessageStoreQueryProvider
class and provide your own custom queries.
Version 4.0 added the MESSAGE_SEQUENCE
column to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.
Custom Message Insertion
Since version 5.0, by overloading the ChannelMessageStorePreparedStatementSetter
class, you can provide a custom implementation for message insertion in the JdbcChannelMessageStore
.
You can use it to set different columns or change the table structure or serialization strategy.
For example, instead of default serialization to byte[]
, you can store its structure as a JSON string.
The following example uses the default implementation of setValues
to store common columns and overrides the behavior to store the message payload as a varchar
:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
Generally, we do not recommend using a relational database for queuing. Instead, if possible, consider using either JMS- or AMQP-backed channels instead. For further reference, see the following resource: If you are still planning to use your database as a queue, consider using PostgreSQL and its notification mechanism which is described in a subsequent section. |
Concurrent Polling
When polling a message channel, you have the option to configure the associated Poller
with a TaskExecutor
reference.
Keep in mind, though, that if you use a JDBC backed message channel and you plan to poll the channel and consequently the message store transactionally with multiple threads, you should ensure that you use a relational database that supports Multiversion Concurrency Control (MVCC). Otherwise, locking may be an issue and the performance, when using multiple threads, may not materialize as expected. For example, Apache Derby is problematic in that regard. To achieve better JDBC queue throughput and avoid issues when different threads may poll the same
|
Priority Channel
Starting with version 4.0, JdbcChannelMessageStore
implements PriorityCapableChannelMessageStore
and provides the priorityEnabled
option, letting it be used as a message-store
reference for priority-queue
instances.
For this purpose, the INT_CHANNEL_MESSAGE
table has a MESSAGE_PRIORITY
column to store the value of PRIORITY
message headers.
In addition, a new MESSAGE_SEQUENCE
column lets us achieve a robust first-in-first-out (FIFO) polling mechanism, even when multiple messages are stored with the same priority in the same millisecond.
Messages are polled (selected) from the database with order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
.
We do not recommend using the same JdbcChannelMessageStore bean for priority and non-priority queue channels, because the priorityEnabled option applies to the entire store and proper FIFO queue semantics are not retained for the queue channel.
However, the same INT_CHANNEL_MESSAGE table (and even region ) can be used for both JdbcChannelMessageStore types.
To configure that scenario, you can extend one message store bean from the other, as the following example shows:
|
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
Partitioning a Message Store
It is common to use a JdbcMessageStore
as a global store for a group of applications or nodes in the same application.
To provide some protection against name clashes and to give control over the database meta-data configuration, the message store lets the tables be partitioned in two ways.
One way is to use separate table names, by changing the prefix (as described earlier).
The other way is to specify a region
name for partitioning data within a single table.
An important use case for the second approach is when the MessageStore
is managing persistent queues that back a Spring Integration Message Channel.
The message data for a persistent channel is keyed in the store on the channel name.
Consequently, if the channel names are not globally unique, the channels can pick up data that is not intended for them.
To avoid this danger, you can use the message store region
to keep data separate for different physical channels that have the same logical name.
PostgreSQL: Receiving Push Notifications
PostgreSQL offers a listen and notification framework for receiving push notifications upon database table manipulations.
Spring Integration leverages this mechanism (starting with version 6.0) to allow for receiving push notifications when new messages are added to a JdbcChannelMessageStore
.
When using this feature, a database trigger must be defined, which can be found as part of the comments of the schema-postgresql.sql
file which is included in the JDBC module of Spring Integration.
Push notifications are received via the PostgresChannelMessageTableSubscriber
class which allows its subscribers to receive a callback upon the arrival of new messages for any given region
and groupId
.
These notifications are received even if a message was appended on a different JVM, but to the same database.
The PostgresSubscribableChannel
implementation uses a PostgresChannelMessageTableSubscriber.Subscription
contract to pull messages from the store as a reaction for notification from the mentioned PostgresChannelMessageTableSubscriber
notifications.
For example, push notifications for some group
can be received as follows:
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
Transaction support
Starting with version 6.0.5, specifying a PlatformTransactionManager
on a PostgresSubscribableChannel
will notify subscribers in a transaction.
An exception in a subscriber will cause the transaction to be rolled back and the message to be put back in the message store.
Transactional support is not activated by default.
Retries
Starting with version 6.0.5, a retry policy can be specified by providing a RetryTemplate
to the PostgresSubscribableChannel
.
By default, no retries are performed.
Any active For this need of an exclusive connection, it is also recommended that a JVM only runs a single |
Stored Procedures
In certain situations, plain JDBC support is not sufficient. Maybe you deal with legacy relational database schemas or you have complex data processing needs, but, ultimately, you have to use stored procedures or stored functions. Since Spring Integration 2.1, we provide three components to execute stored procedures or stored functions:
-
Stored Procedures Inbound Channel Adapter
-
Stored Procedures Outbound Channel Adapter
-
Stored Procedures Outbound Gateway
Supported Databases
In order to enable calls to stored procedures and stored functions, the stored procedure components use the org.springframework.jdbc.core.simple.SimpleJdbcCall
class.
Consequently, the following databases are fully supported for executing stored procedures:
-
Apache Derby
-
DB2
-
MySQL
-
Microsoft SQL Server
-
Oracle
-
PostgreSQL
-
Sybase
If you want to execute stored functions instead, the following databases are fully supported:
-
MySQL
-
Microsoft SQL Server
-
Oracle
-
PostgreSQL
Even though your particular database may not be fully supported, chances are that you can use the stored procedure Spring Integration components quite successfully anyway, provided your RDBMS supports stored procedures or stored functions. As a matter of fact, some provided integration tests use the H2 database. Nevertheless, it is very important to thoroughly test those usage scenarios. |
Configuration
The stored procedure components provide full XML Namespace support, and configuring the components is similar as for the general purpose JDBC components discussed earlier.
Common Configuration Attributes
All stored procedure components share certain configuration parameters:
-
auto-startup
: Lifecycle attribute signaling whether this component should be started during application context startup. It defaults totrue
. Optional. -
data-source
: Reference to ajavax.sql.DataSource
, which is used to access the database. Required. -
id
: Identifies the underlying Spring bean definition, which is an instance of eitherEventDrivenConsumer
orPollingConsumer
, depending on whether the outbound channel adapter’schannel
attribute references aSubscribableChannel
or aPollableChannel
. Optional. -
ignore-column-meta-data
: For fully supported databases, the underlyingSimpleJdbcCall
class can automatically retrieve the parameter information for the stored procedure or stored function from the JDBC metadata.However, if the database does not support metadata lookups or if you need to provide customized parameter definitions, this flag can be set to
true
. It defaults tofalse
. Optional. -
is-function
: Iftrue
, a SQL Function is called. In that case, thestored-procedure-name
orstored-procedure-name-expression
attributes define the name of the called function. It defaults tofalse
. Optional. -
stored-procedure-name
: This attribute specifies the name of the stored procedure. If theis-function
attribute is set totrue
, this attribute specifies the function name instead. Either this property orstored-procedure-name-expression
must be specified. -
stored-procedure-name-expression
: This attribute specifies the name of the stored procedure by using a SpEL expression. By using SpEL, you have access to the full message (if available), including its headers and payload. You can use this attribute to invoke different stored procedures at runtime. For example, you can provide stored procedure names that you would like to execute as a message header. The expression must resolve to aString
.If the
is-function
attribute is set totrue
, this attribute specifies a stored function. Either this property orstored-procedure-name
must be specified. -
jdbc-call-operations-cache-size
: Defines the maximum number of cachedSimpleJdbcCallOperations
instances. Basically, for each stored procedure name, a newSimpleJdbcCallOperations
instance is created that, in return, is cached.Spring Integration 2.2 added the stored-procedure-name-expression
attribute and thejdbc-call-operations-cache-size
attribute.The default cache size is
10
. A value of0
disables caching. Negative values are not permitted.If you enable JMX, statistical information about the
jdbc-call-operations-cache
is exposed as an MBean. See MBean Exporter for more information. -
sql-parameter-source-factory
: (Not available for the stored procedure inbound channel adapter.) Reference to aSqlParameterSourceFactory
. By default, bean properties of the passed inMessage
payload are used as a source for the stored procedure’s input parameters by using aBeanPropertySqlParameterSourceFactory
.This may suffice for basic use cases. For more sophisticated options, consider passing in one or more
ProcedureParameter
values. See Defining Parameter Sources. Optional. -
use-payload-as-parameter-source
: (Not available for the stored procedure inbound channel adapter.) If set totrue
, the payload of theMessage
is used as a source for providing parameters. If set tofalse
, however, the entireMessage
is available as a source for parameters.If no procedure parameters are passed in, this property defaults to
true
. This means that, by using a defaultBeanPropertySqlParameterSourceFactory
, the bean properties of the payload are used as a source for parameter values for the stored procedure or stored function.However, if procedure parameters are passed in, this property (by default) evaluates to
false
.ProcedureParameter
lets SpEL Expressions be provided. Therefore, it is highly beneficial to have access to the entireMessage
. The property is set on the underlyingStoredProcExecutor
. Optional.
Common Configuration Sub-Elements
The stored procedure components share a common set of child elements that you can use to define and pass parameters to stored procedures or stored functions. The following elements are available:
-
parameter
-
returning-resultset
-
sql-parameter-definition
-
poller
-
parameter
: Provides a mechanism to provide stored procedure parameters. Parameters can be either static or provided by using a SpEL Expressions.<int-jdbc:parameter name="" (1) type="" (2) value=""/> (3) <int-jdbc:parameter name="" expression=""/> (4)
+ <1> The name of the parameter to be passed into the Stored Procedure or Stored Function. Required. <2> This attribute specifies the type of the value. If nothing is provided, this attribute defaults to
java.lang.String
. This attribute is used only when thevalue
attribute is used. Optional. <3> The value of the parameter. You must provide either this attribute or theexpression
attribute. Optional. <4> Instead of thevalue
attribute, you can specify a SpEL expression for passing the value of the parameter. If you specify theexpression
, thevalue
attribute is not allowed. Optional.Optional.
-
returning-resultset
: Stored procedures may return multiple result sets. By setting one or morereturning-resultset
elements, you can specifyRowMappers
to convert each returnedResultSet
to meaningful objects. Optional.<int-jdbc:returning-resultset name="" row-mapper="" />
-
sql-parameter-definition
: If you use a database that is fully supported, you typically do not have to specify the stored procedure parameter definitions. Instead, those parameters can be automatically derived from the JDBC metadata. However, if you use databases that are not fully supported, you must set those parameters explicitly by using thesql-parameter-definition
element.You can also choose to turn off any processing of parameter metadata information obtained through JDBC by using the
ignore-column-meta-data
attribute.<int-jdbc:sql-parameter-definition name="" (1) direction="IN" (2) type="STRING" (3) scale="5" (4) type-name="FOO_STRUCT" (5) return-type="fooSqlReturnType"/> (6)
1 Specifies the name of the SQL parameter. Required. 2 Specifies the direction of the SQL parameter definition. Defaults to IN
. Valid values are:IN
,OUT
, andINOUT
. If your procedure is returning result sets, use thereturning-resultset
element. Optional.3 The SQL type used for this SQL parameter definition. Translates into an integer value, as defined by java.sql.Types
. Alternatively, you can provide the integer value as well. If this attribute is not explicitly set, it defaults to 'VARCHAR'. Optional.4 The scale of the SQL parameter. Only used for numeric and decimal parameters. Optional. 5 The typeName
for types that are user-named, such as:STRUCT
,DISTINCT
,JAVA_OBJECT
, and named array types. This attribute is mutually exclusive with thescale
attribute. Optional.6 The reference to a custom value handler for complex types. An implementation of SqlReturnType
. This attribute is mutually exclusive with thescale
attribute and is only applicable for OUT and INOUT parameters. Optional. -
poller
: Lets you configure a message poller if this endpoint is aPollingConsumer
. Optional.
Defining Parameter Sources
Parameter sources govern the techniques of retrieving and mapping the Spring Integration message properties to the relevant stored procedure input parameters.
The stored procedure components follow certain rules.
By default, the bean properties of the Message
payload are used as a source for the stored procedure’s input parameters.
In that case, a BeanPropertySqlParameterSourceFactory
is used.
This may suffice for basic use cases.
The next example illustrates that default behavior.
For the “automatic” lookup of bean properties by using the BeanPropertySqlParameterSourceFactory to work, your bean properties must be defined in lower case.
This is due to the fact that in org.springframework.jdbc.core.metadata.CallMetaDataContext (the Java method is matchInParameterValuesWithCallParameters() ), the retrieved stored procedure parameter declarations are converted to lower case.
As a result, if you have camel-case bean properties (such as lastName ), the lookup fails.
In that case, provide an explicit ProcedureParameter .
|
Suppose we have a payload that consists of a simple bean with the following three properties: id
, name
, and description
.
Furthermore, we have a simplistic Stored Procedure called INSERT_COFFEE
that accepts three input parameters: id
, name
, and description
.
We also use a fully supported database.
In that case, the following configuration for a stored procedure outbound adapter suffices:
<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
channel="insertCoffeeProcedureRequestChannel"
stored-procedure-name="INSERT_COFFEE"/>
For more sophisticated options, consider passing in one or more ProcedureParameter
values.
If you do provide ProcedureParameter
values explicitly, by default, an ExpressionEvaluatingSqlParameterSourceFactory
is used for parameter processing, to enable the full power of SpEL expressions.
If you need even more control over how parameters are retrieved, consider passing in a custom implementation of SqlParameterSourceFactory
by using the sql-parameter-source-factory
attribute.
Stored Procedure Inbound Channel Adapter
The following listing calls out the attributes that matter for a stored procedure inbound channel adapter:
<int-jdbc:stored-proc-inbound-channel-adapter
channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
is-function="false"
skip-undeclared-results="" (2)
return-value-required="false" (3)
<int:poller/>
<int-jdbc:sql-parameter-definition name="" direction="IN"
type="STRING"
scale=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>
1 | Channel to which polled messages are sent.
If the stored procedure or function does not return any data, the payload of the Message is null.
Required. |
2 | If this attribute is set to true , all results from a stored procedure call that do not have a corresponding SqlOutParameter declaration are bypassed.
For example, stored procedures can return an update count value, even though your stored procedure declared only a single result parameter.
The exact behavior depends on the database implementation.
The value is set on the underlying JdbcTemplate .
The value defaults to true .
Optional. |
3 | Indicates whether this procedure’s return value should be included. Since Spring Integration 3.0. Optional. |
Stored Procedure Outbound Channel Adapter
The following listing calls out the attributes that matter for a stored procedure outbound channel adapter:
<int-jdbc:stored-proc-outbound-channel-adapter channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
order="" (2)
sql-parameter-source-factory=""
use-payload-as-parameter-source="">
<int:poller fixed-rate=""/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name=""/>
</int-jdbc:stored-proc-outbound-channel-adapter>
1 | The receiving message channel of this endpoint. Required. |
2 | Specifies the order for invocation when this endpoint is connected as a subscriber to a channel.
This is particularly relevant when that channel is using a failover dispatching strategy.
It has no effect when this endpoint is itself a polling consumer for a channel with a queue.
Optional. |
Stored Procedure Outbound Gateway
The following listing calls out the attributes that matter for a stored procedure outbound channel adapter:
<int-jdbc:stored-proc-outbound-gateway request-channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
is-function="false"
order=""
reply-channel="" (2)
reply-timeout="" (3)
return-value-required="false" (4)
skip-undeclared-results="" (5)
sql-parameter-source-factory=""
use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
type=""
scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
1 | The receiving message channel of this endpoint. Required. |
2 | Message channel to which replies should be sent after receiving the database response. Optional. |
3 | Lets you specify how long this gateway waits for the reply message to be sent successfully before throwing an exception.
Keep in mind that, when sending to a DirectChannel , the invocation occurs in the sender’s thread.
Consequently, the failing of the send operation may be caused by other components further downstream.
The value is specified in milliseconds.
Optional. |
4 | Indicates whether this procedure’s return value should be included. Optional. |
5 | If the skip-undeclared-results attribute is set to true , all results from a stored procedure call that do not have a corresponding SqlOutParameter declaration are bypassed.
For example, stored procedures may return an update count value, even though your stored procedure only declared a single result parameter.
The exact behavior depends on the database.
The value is set on the underlying JdbcTemplate .
The value defaults to true .
Optional. |
Examples
This section contains two examples that call Apache Derby stored procedures.
The first procedure calls a stored procedure that returns a ResultSet
.
By using a RowMapper
, the data is converted into a domain object, which then becomes the Spring Integration message payload.
In the second sample, we call a stored procedure that uses output parameters to return data instead.
Have a look at the Spring Integration Samples project. The project contains the Apache Derby example referenced here, as well as instructions on how to run it. The Spring Integration Samples project also provides an example of using Oracle stored procedures. |
In the first example, we call a stored procedure named FIND_ALL_COFFEE_BEVERAGES
that does not define any input parameters but that returns a ResultSet
.
In Apache Derby, stored procedures are implemented in Java. The following listing shows the method signature:
public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
throws SQLException {
...
}
The following listing shows the corresponding SQL:
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';
In Spring Integration, you can now call this stored procedure by using, for example, a stored-proc-outbound-gateway
, as the following example shows:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
data-source="dataSource"
request-channel="findAllProcedureRequestChannel"
expect-single-result="true"
stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>
In the second example, we call a stored procedure named FIND_COFFEE
that has one input parameter.
Instead of returning a ResultSet
, it uses an output parameter.
The following example shows the method signature:
public static void findCoffee(int coffeeId, String[] coffeeDescription)
throws SQLException {
...
}
The following listing shows the corresponding SQL:
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';
In Spring Integration, you can now call this Stored Procedure by using, for example, a stored-proc-outbound-gateway
, as the following example shows:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
data-source="dataSource"
request-channel="findCoffeeProcedureRequestChannel"
skip-undeclared-results="true"
stored-procedure-name="FIND_COFFEE"
expect-single-result="true">
<int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>
JDBC Lock Registry
Version 4.3 introduced the JdbcLockRegistry
.
Certain components (for example, aggregator and resequencer) use a lock obtained from a LockRegistry
instance to ensure that only one thread manipulates a group at a time.
The DefaultLockRegistry
performs this function within a single component.
You can now configure an external lock registry on these components.
When used with a shared MessageGroupStore
, you can use the JdbcLockRegistry
to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
When a lock is released by a local thread, another local thread can generally acquire the lock immediately. If a lock is released by a thread that uses a different registry instance, it can take up to 100ms to acquire the lock.
The JdbcLockRegistry
is based on the LockRepository
abstraction, which has a DefaultLockRepository
implementation.
The database schema scripts are located in the org.springframework.integration.jdbc
package, which is divided for the particular RDBMS vendors.
For example, the following listing shows the H2 DDL for the lock table:
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
The INT_
can be changed according to the target database design requirements.
Therefore, you must use prefix
property on the DefaultLockRepository
bean definition.
Sometimes, one application has moved to such a state that it cannot release the distributed lock and remove the particular record in the database.
For this purpose, such deadlocks can be expired by the other application on the next locking invocation.
The timeToLive
(TTL) option on the DefaultLockRepository
is provided for this purpose.
You may also want to specify CLIENT_ID
for the locks stored for a given DefaultLockRepository
instance.
If so, you can specify the id
to be associated with the DefaultLockRepository
as a constructor parameter.
Starting with version 5.1.8, the JdbcLockRegistry
can be configured with the idleBetweenTries
- a Duration
to sleep between lock record insert/update executions.
By default, it is 100
milliseconds and in some environments non-leaders pollute connections with data source too often.
Starting with version 5.4, the RenewableLockRegistry
interface has been introduced and added to JdbcLockRegistry
.
The renewLock()
method must be called during locked process in case of the locked process would be longer than time to live of the lock.
So the time to live can be highly reduce and deployments can retake a lost lock quickly.
The lock renewal can be done only if the lock is held by the current thread. |
String with version 5.5.6, the JdbcLockRegistry
is support automatically clean up cache for JdbcLock in JdbcLockRegistry.locks
via JdbcLockRegistry.setCacheCapacity()
.
See its JavaDocs for more information.
String with version 6.0, the DefaultLockRepository
can be supplied with a PlatformTransactionManager
instead of relying on the primary bean from the application context.
String with version 6.1, the DefaultLockRepository
can be configured for custom insert
, update
and renew
queries.
For this purpose the respective setters and getters are exposed.
For example, an insert query for PostgreSQL hint can be configured like this:
lockRepository.setInsertQuery(lockRepository.getInsertQuery() + " ON CONFLICT DO NOTHING");
JDBC Metadata Store
Version 5.0 introduced the JDBC MetadataStore
(see Metadata Store) implementation.
You can use the JdbcMetadataStore
to maintain the metadata state across application restarts.
This MetadataStore
implementation can be used with adapters such as the following:
To configure these adapters to use the JdbcMetadataStore
, declare a Spring bean by using a bean name of metadataStore
.
The Feed inbound channel adapter and the feed inbound channel adapter both automatically pick up and use the declared JdbcMetadataStore
, as the following example shows:
@Bean
public MetadataStore metadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}
The org.springframework.integration.jdbc
package has Database schema scripts for several RDMBS vendors.
For example, the following listing shows the H2 DDL for the metadata table:
CREATE TABLE INT_METADATA_STORE (
METADATA_KEY VARCHAR(255) NOT NULL,
METADATA_VALUE VARCHAR(4000),
REGION VARCHAR(100) NOT NULL,
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);
You can change the INT_
prefix to match the target database design requirements.
You can also configure JdbcMetadataStore
to use the custom prefix.
The JdbcMetadataStore
implements ConcurrentMetadataStore
, letting it be reliably shared across multiple application instances, where only one instance can store or modify a key’s value.
All of these operations are atomic, thanks to transaction guarantees.
Transaction management must use JdbcMetadataStore
.
Inbound channel adapters can be supplied with a reference to the TransactionManager
in the poller configuration.
Unlike non-transactional MetadataStore
implementations, with JdbcMetadataStore
, the entry appears in the target table only after the transaction commits.
When a rollback occurs, no entries are added to the INT_METADATA_STORE
table.
Since version 5.0.7, you can configure the JdbcMetadataStore
with the RDBMS vendor-specific lockHint
option for lock-based queries on metadata store entries.
By default, it is FOR UPDATE
and can be configured with an empty string if the target database does not support row locking functionality.
Consult with your vendor for particular and possible hints in the SELECT
expression for locking rows before updates.