Spring Integration provides channel adapters for receiving and sending messages by using database queries. Through those adapters, Spring Integration supports not only plain JDBC SQL queries but also stored procedure and stored function calls.
By default, the following JDBC components are available:
The Spring Integration JDBC Module also provides a JDBC Message Store.
The main function of an inbound channel adapter is to execute a SQL SELECT
query and turn the result set into a message.
The message payload is the whole result set (expressed as a List
), and the types of the items in the list depend on the row-mapping strategy.
The default strategy is a generic mapper that returns a Map
for each row in the query result.
Optionally, you can change this by adding a reference to a RowMapper
instance (see the Spring JDBC documentation for more detailed information about row mapping).
Note | |
---|---|
If you want to convert rows in the |
The inbound adapter also requires a reference to either a JdbcTemplate
instance or a DataSource
.
As well as the SELECT
statement to generate the messages, the adapter also has an UPDATE
statement that marks the records as processed so that they do not show up in the next poll.
The update can be parameterized by the list of IDs from the original select.
By default, this is done through a naming convention (a column in the input result set called id
is translated into a list in the parameter map for the update called id
).
The following example defines an inbound channel adapter with an update query and a DataSource
reference.
<int-jdbc:inbound-channel-adapter query="select * from item where status=2" channel="target" data-source="dataSource" update="update item set status=10 where id in (:id)" />
Note | |
---|---|
The parameters in the update query are specified with a colon ( |
To change the parameter generation strategy, you can inject a SqlParameterSourceFactory
into the adapter to override the default behavior (the adapter has a sql-parameter-source-factory
attribute).
Spring Integration provides ExpressionEvaluatingSqlParameterSourceFactory
, which creates a SpEL-based parameter source, with the results of the query as the #root
object.
(If update-per-row
is true, the root object is the row).
If the same parameter name appears multiple times in the update query, it is evaluated only once, and its result is cached.
You can also use a parameter source for the select query.
In this case, since there is no "result
" object to evaluate against, a single parameter source is used each time (rather than using a parameter source factory).
Starting with version 4.0, you can use Spring to create a SpEL based parameter source, as the following example shows:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status" channel="target" data-source="dataSource" select-sql-parameter-source="parameterSource" /> <bean id="parameterSource" factory-bean="parameterSourceFactory" factory-method="createParameterSourceNoCache"> <constructor-arg value="" /> </bean> <bean id="parameterSourceFactory" class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <property name="parameterExpressions"> <map> <entry key="status" value="@statusBean.which()" /> </map> </property> </bean> <bean id="statusBean" class="foo.StatusDetermination" />
The value
in each parameter expression can be any valid SpEL expression.
The #root
object for the expression evaluation is the constructor argument defined on the parameterSource
bean.
It is static for all evaluations (in the preceding example, an empty String
).
Starting with version 5.0, you ca supply ExpressionEvaluatingSqlParameterSourceFactory
with sqlParameterTypes
to specify the target SQL type for the particular parameter.
The following example provides SQL types for the parameters being used in the query:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status" channel="target" data-source="dataSource" select-sql-parameter-source="parameterSource" /> <bean id="parameterSource" factory-bean="parameterSourceFactory" factory-method="createParameterSourceNoCache"> <constructor-arg value="" /> </bean> <bean id="parameterSourceFactory" class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <property name="sqlParameterTypes"> <map> <entry key="status" value="#{ T(java.sql.Types).BINARY}" /> </map> </property> </bean>
Important | |
---|---|
Use the |
The inbound adapter accepts a regular Spring Integration poller as a child element. Consequently, the frequency of the polling can be controlled (among other uses). An important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, as the following example shows:
<int-jdbc:inbound-channel-adapter query="..." channel="target" data-source="dataSource" update="..."> <int:poller fixed-rate="1000"> <int:transactional/> </int:poller> </int-jdbc:inbound-channel-adapter>
Note | |
---|---|
If you do not explicitly specify a poller, a default value is used. As is normal with Spring Integration, it can be defined as a top-level bean). |
In the preceding example, the database is polled every 1000 milliseconds (or once a second), and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown. However, as long as it is aware of the data source, the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.
The JDBC inbound channel adapter defines an attribute called max-rows
.
When you specify the adapter’s poller, you can also define a property called max-messages-per-poll
.
While these two attributes look similar, their meaning is quite different.
max-messages-per-poll
specifies the number of times the query is executed per polling interval, whereas max-rows
specifies the number of rows returned for each execution.
Under normal circumstances, you would likely not want to set the poller’s max-messages-per-poll
property when you use the JDBC inbound channel adapter.
Its default value is 1
, which means that the JDBC inbound channel adapter’s receive()
method is executed exactly once for each poll interval.
Setting the max-messages-per-poll
attribute to a larger value means that the query is executed that many times back to back.
For more information regarding the max-messages-per-poll
attribute, see Section 6.3.1, “Configuring An Inbound Channel Adapter”.
In contrast, the max-rows
attribute, if greater than 0
, specifies the maximum number of rows to be used from the query result set created by the receive()
method.
If the attribute is set to 0
, all rows are included in the resulting message.
The attribute defaults to 0
.
Note | |
---|---|
It is recommended to use result set limiting via vendor-specific query options, for example MySQL |
The outbound channel adapter is the inverse of the inbound: Its role is to handle a message and use it to execute a SQL query. By default, the message payload and headers are available as input parameters to the query, as the following example shows:
<int-jdbc:outbound-channel-adapter query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])" data-source="dataSource" channel="input"/>
In the preceding example, messages arriving on the channel labelled input
have a payload of a map with a key of something
, so the []
operator dereferences that value from the map.
The headers are also accessed as a map.
Note | |
---|---|
The parameters in the preceding query are bean property expressions on the incoming message (not SpEL expressions).
This behavior is part of the |
The outbound adapter requires a reference to either a DataSource
or a JdbcTemplate
.
You can also inject a SqlParameterSourceFactory
to control the binding of each incoming message to a query.
If the input channel is a direct channel, the outbound adapter runs its query in the same thread and, therefore, the same transaction (if there is one) as the sender of the message.
A common requirement for most JDBC channel adapters is to pass parameters as part of SQL queries or stored procedures or functions.
As mentioned earlier, these parameters are by default bean property expressions, not SpEL expressions.
However, if you need to pass SpEL expression as parameters, you must explicitly inject a SqlParameterSourceFactory
.
The following example uses a ExpressionEvaluatingSqlParameterSourceFactory
to achieve that requirement:
<jdbc:outbound-channel-adapter data-source="dataSource" channel="input" query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) \ values (:id, :payload, :createdDate)" sql-parameter-source-factory="spelSource"/> <bean id="spelSource" class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <property name="parameterExpressions"> <map> <entry key="id" value="headers['id'].toString()"/> <entry key="createdDate" value="new java.util.Date()"/> <entry key="payload" value="payload"/> </map> </property> </bean>
For further information, see Section 21.5.5, “Defining Parameter Sources”.
Sometimes, the flexibility and loose-coupling of SqlParameterSourceFactory
does not do what we need for the target PreparedStatement
or we need to do some low-level JDBC work.
The Spring JDBC module provides APIs to configure the execution environment (such as ConnectionCallback
or PreparedStatementCreator
) and manipulate parameter values (such as SqlParameterSource
).
It can even access APIs for low-level operations, such as StatementCallback
.
Starting with Spring Integration 4.2, MessagePreparedStatementSetter
allows the specification of parameters on the PreparedStatement
manually, in the requestMessage
context.
This class plays exactly the same role as PreparedStatementSetter
in the standard Spring JDBC API.
Actually, it is invoked directly from an inline PreparedStatementSetter
implementation when the JdbcMessageHandler
invokes execute
on the JdbcTemplate
.
This functional interface option is mutually exclusive with sqlParameterSourceFactory
and can be used as a more powerful alternative to populate parameters of the PreparedStatement
from the requestMessage
.
For example, it is useful when we need to store File
data to the DataBase BLOB
column in a streaming manner.
The following example shows how to do so:
@Bean @ServiceActivator(inputChannel = "storeFileChannel") public MessageHandler jdbcMessageHandler(DataSource dataSource) { JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource, "INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)"); jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> { ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME)); try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) { ps.setBlob(2, inputStream); } catch (Exception e) { throw new MessageHandlingException(m, e); } ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class))); }); return jdbcMessageHandler; }
From the XML configuration perspective, the prepared-statement-setter
attribute is available on the <int-jdbc:outbound-channel-adapter>
component.
It lets you specify a MessagePreparedStatementSetter
bean reference.
The outbound gateway is like a combination of the outbound and inbound adapters: Its role is to handle a message and use it to execute a SQL query and then respond with the result by sending it to a reply channel. By default, the message payload and headers are available as input parameters to the query, as the following example shows:
<int-jdbc:outbound-gateway update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])" request-channel="input" reply-channel="output" data-source="dataSource" />
The result of the preceding example is to insert a record into the mythings
table and return a message that indicates the number of rows affected (the payload is a map: {UPDATED=1}
) to the output channel .
If the update query is an insert with auto-generated keys, you can populate the reply message with the generated keys by adding keys-generated="true"
to the preceding example (this is not the default because it is not supported by some database platforms).
The following example shows the changed configuration:
<int-jdbc:outbound-gateway update="insert into mything (status, name) values (0, :payload[thing])" request-channel="input" reply-channel="output" data-source="dataSource" keys-generated="true"/>
Instead of the update count or the generated keys, you can also provide a select query to execute and generate a reply message from the result (such as the inbound adapter), as the following example shows:
<int-jdbc:outbound-gateway update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])" query="select * from foos where id=:headers[$id]" request-channel="input" reply-channel="output" data-source="dataSource"/>
Since Spring Integration 2.2, the update SQL query is no longer mandatory.
You can now provide only a select query, by using either the query
attribute or the query
element.
This is extremely useful if you need to actively retrieve data by using, for example, a generic gateway or a payload enricher.
The reply message is then generated from the result (similar to how the inbound adapter works) and passed to the reply channel.
The following example show to use the query
attribute:
<int-jdbc:outbound-gateway query="select * from foos where id=:headers[id]" request-channel="input" reply-channel="output" data-source="dataSource"/>
Important | |
---|---|
By default, the component for the |
As with the channel adapters, you can also provide SqlParameterSourceFactory
instances for request and reply.
The default is the same as for the outbound adapter, so the request message is available as the root of an expression.
If keys-generated="true"
, the root of the expression is the generated keys (a map if there is only one or a list of maps if multi-valued).
The outbound gateway requires a reference to either a DataSource
or a JdbcTemplate
.
It can also have a SqlParameterSourceFactory
injected to control the binding of the incoming message to the query.
Starting with the version 4.2, the request-prepared-statement-setter
attribute is available on the <int-jdbc:outbound-gateway>
as an alternative to request-sql-parameter-source-factory
.
It lets you specify a MessagePreparedStatementSetter
bean reference, which implements more sophisticated PreparedStatement
preparation before its execution.
See Section 21.2, “Outbound Channel Adapter” for more information about MessagePreparedStatementSetter
.
Spring Integration provides two JDBC specific message store implementations.
The JdbcMessageStore
is suitable for use with aggregators and the claim check pattern.
The JdbcChannelMessageStore
implementation provides a more targeted and scalable implementation specifically for message channel.
Note that you can use a JdbcMessageStore
to back a message channel, JdbcChannelMessageStore
is optimized for that purpose.
Before starting to use JDBC message store components, you should provision a target database with the appropriate objects.
Spring Integration ships with some sample scripts that can be used to initialize a database.
In the spring-integration-jdbc
JAR file, you can find scripts in the org.springframework.integration.jdbc
package.
It provides an example create and an example drop script for a range of common database platforms.
A common way to use these scripts is to reference them in a Spring JDBC data source initializer.
Note that the scripts are provided as samples and as specifications of the the required table and column names.
You may find that you need to enhance them for production use (for, example, by adding index declarations).
The JDBC module provides an implementation of the Spring Integration MessageStore
(important in the claim check pattern) and MessageGroupStore
(important in stateful patterns such as an aggregator) backed by a database.
Both interfaces are implemented by the JdbcMessageStore
, and there is support for configuring store instances in XML, as the following example shows:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
You can specify a JdbcTemplate
instead of a DataSource
.
The following example shows some other optional attributes:
<int-jdbc:message-store id="messageStore" data-source="dataSource" lob-handler="lobHandler" table-prefix="MY_INT_"/>
In the preceding example, we have specified a LobHandler
for dealing with messages as large objects (which is often necessary for Oracle) and a prefix for the table names in the queries generated by the store.
The table name prefix defaults to INT_
.
If you intend to backing message channels with JDBC, we recommend using the JdbcChannelMessageStore
implementation.
It works only in conjunction with Message Channels.
The JdbcChannelMessageStore
uses database-specific SQL queries to retrieve messages from the database.
Therefore, you must set the ChannelMessageStoreQueryProvider
property on the JdbcChannelMessageStore
.
This channelMessageStoreQueryProvider
provides the SQL queries for the particular database you specify.
Spring Integration provides support for the following relational databases:
If your database is not listed, you can extend the AbstractChannelMessageStoreQueryProvider
class and provide your own custom queries.
Version 4.0 added the MESSAGE_SEQUENCE
column to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.
Since version 5.0, by overloading the ChannelMessageStorePreparedStatementSetter
class, you can provide a custom implementation for message insertion in the JdbcChannelMessageStore
.
You can use it to set different columns or change the table structure or serialization strategy.
For example, instead of default serialization to byte[]
, you can store its structure as a JSON string.
The following example uses the default implementation of setValues
to store common columns and overrides the behavior to store the message payload as a varchar
:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter { public JsonPreparedStatementSetter() { super(); } @Override public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage, Object groupId, String region, boolean priorityEnabled) throws SQLException { // Populate common columns super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled); // Store message payload as varchar preparedStatement.setString(6, requestMessage.getPayload().toString()); } }
Important | |
---|---|
Generally, we do not recommend using a relational database for queuing. Instead, if possible, consider using either JMS- or AMQP-backed channels instead. For further reference, see the following resources: |
When polling a message channel, you have the option to configure the associated Poller
with a TaskExecutor
reference.
Important | |
---|---|
Keep in mind, though, that if you use a JDBC backed message channel and you plan to poll the channel and consequently the message store transactionally with multiple threads, you should ensure that you use a relational database that supports Multiversion Concurrency Control (MVCC). Otherwise, locking may be an issue and the performance, when using multiple threads, may not materialize as expected. For example, Apache Derby is problematic in that regard. To achieve better JDBC queue throughput and avoid issues when different threads may poll the same <bean id="queryProvider" class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/> <int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" /> <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/> </int:transaction-synchronization-factory> <task:executor id="pool" pool-size="10" queue-capacity="10" rejection-policy="CALLER_RUNS" /> <bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore"> <property name="dataSource" ref="dataSource"/> <property name="channelMessageStoreQueryProvider" ref="queryProvider"/> <property name="region" value="TX_TIMEOUT"/> <property name="usingIdCache" value="true"/> </bean> <int:channel id="inputChannel"> <int:queue message-store="store"/> </int:channel> <int:bridge input-channel="inputChannel" output-channel="outputChannel"> <int:poller fixed-delay="500" receive-timeout="500" max-messages-per-poll="1" task-executor="pool"> <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="transactionManager" /> </int:poller> </int:bridge> <int:channel id="outputChannel" /> |
Starting with version 4.0, JdbcChannelMessageStore
implements PriorityCapableChannelMessageStore
and provides the priorityEnabled
option, letting it be used as a message-store
reference for priority-queue
instances.
For this purpose, the INT_CHANNEL_MESSAGE
table has a MESSAGE_PRIORITY
column to store the value of PRIORITY
message headers.
In addition, a new MESSAGE_SEQUENCE
column lets us achieve a robust first-in-first-out (FIFO) polling mechanism, even when multiple messages are stored with the same priority in the same millisecond.
Messages are polled (selected) from the database with order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
.
Note | |
---|---|
We do not recommend using the same |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore"> <property name="dataSource" ref="dataSource"/> <property name="channelMessageStoreQueryProvider" ref="queryProvider"/> </bean> <int:channel id="queueChannel"> <int:queue message-store="channelStore"/> </int:channel> <bean id="priorityStore" parent="channelStore"> <property name="priorityEnabled" value="true"/> </bean> <int:channel id="priorityChannel"> <int:priority-queue message-store="priorityStore"/> </int:channel>
It is common to use a JdbcMessageStore
as a global store for a group of applications or nodes in the same application.
To provide some protection against name clashes and to give control over the database meta-data configuration, the message store lets the tables be partitioned in two ways.
One way is to use separate table names, by changing the prefix (as described earlier).
The other way is to specify a region
name for partitioning data within a single table.
An important use case for the second approach is when the MessageStore
is managing persistent queues that back a Spring Integration Message Channel.
The message data for a persistent channel is keyed in the store on the channel name.
Consequently, if the channel names are not globally unique, the channels can pick up data that is not intended for them.
To avoid this danger, you can use the message store region
to keep data separate for different physical channels that have the same logical name.
In certain situations, plain JDBC support is not sufficient. Maybe you deal with legacy relational database schemas or you have complex data processing needs, but, ultimately, you have to use stored procedures or stored functions. Since Spring Integration 2.1, we provide three components to execute stored procedures or stored functions:
In order to enable calls to stored procedures and stored functions, the stored procedure components use the org.springframework.jdbc.core.simple.SimpleJdbcCall
class.
Consequently, the following databases are fully supported for executing stored procedures:
If you want to execute stored functions instead, the following databases are fully supported:
Note | |
---|---|
Even though your particular database may not be fully supported, chances are that you can use the stored procedure Spring Integration components quite successfully anyway, provided your RDBMS supports stored procedures or stored functions. As a matter of fact, some of the provided integration tests use the H2 database. Nevertheless, it is very important to thoroughly test those usage scenarios. |
The stored procedure components provide full XML Namespace support, and configuring the components is similar as for the general purpose JDBC components discussed earlier.
All stored procedure components share certain configuration parameters:
auto-startup
: Lifecycle attribute signaling whether this component should be started during application context startup.
It defaults to true
.
Optional.
data-source
: Reference to a javax.sql.DataSource
, which is used to access the database.
Required.
id
: Identifies the underlying Spring bean definition, which is an instance of either EventDrivenConsumer
or PollingConsumer
, depending on whether the outbound channel adapter’s channel
attribute references a SubscribableChannel
or a PollableChannel
.
Optional.
ignore-column-meta-data
: For fully supported databases, the underlying SimpleJdbcCall
class can automatically retrieve the parameter information for the stored procedure or stored function from the JDBC metadata.
However, if the database does not support metadata lookups or if you need to provide customized parameter definitions, this flag can be set to true
.
It defaults to false
.
Optional.
is-function
: If true
, a SQL Function is called.
In that case, the stored-procedure-name
or stored-procedure-name-expression
attributes define the name of the called function.
It defaults to false
.
Optional.
stored-procedure-name
: This attribute specifies the name of the stored procedure.
If the is-function
attribute is set to true
, this attribute specifies the function name instead.
Either this property or stored-procedure-name-expression
must be specified.
stored-procedure-name-expression
: This attribute specifies the name of the stored procedure by using a SpEL expression.
By using SpEL, you have access to the full message (if available), including its headers and payload.
You can use this attribute to invoke different stored procedures at runtime.
For example, you can provide stored procedure names that you would like to execute as a message header.
The expression must resolve to a String
.
If the is-function
attribute is set to true
, this attribute specifies a stored function.
Either this property or stored-procedure-name
must be specified.
jdbc-call-operations-cache-size
: Defines the maximum number of cached SimpleJdbcCallOperations
instances.
Basically, for each stored procedure name, a new SimpleJdbcCallOperations
instance is created that, in return, is cached.
Note | |
---|---|
Spring Integration 2.2 added the |
The default cache size is 10
.
A value of 0
disables caching.
Negative values are not permitted.
If you enable JMX, statistical information about the jdbc-call-operations-cache
is exposed as an MBean.
See Section 12.2.7, “MBean Exporter” for more information.
sql-parameter-source-factory
: (Not available for the stored procedure inbound channel adapter.)
Reference to a SqlParameterSourceFactory
.
By default, bean properties of the passed in Message
payload are used as a source for the stored procedure’s input parameters by using a BeanPropertySqlParameterSourceFactory
.
This may suffice for basic use cases.
For more sophisticated options, consider passing in one or more ProcedureParameter
values.
See Section 21.5.5, “Defining Parameter Sources”.
Optional.
use-payload-as-parameter-source
: (Not available for the stored procedure inbound channel adapter.)
If set to true
, the payload of the Message
is used as a source for providing parameters.
If set to false
, however, the entire Message
is available as a source for parameters.
If no procedure parameters are passed in, this property defaults to true
.
This means that, by using a default BeanPropertySqlParameterSourceFactory
, the bean properties of the payload are used as a source for parameter values for the stored procedure or stored function.
However, if procedure parameters are passed in, this property (by default) evaluates to false
.
ProcedureParameter
lets SpEL Expressions be provided.
Therefore, it is highly beneficial to have access to the entire Message
.
The property is set on the underlying StoredProcExecutor
.
Optional.
The stored procedure components share a common set of child elements that you can use to define and pass parameters to stored procedures or stored functions. The following elements are available:
parameter
returning-resultset
sql-parameter-definition
poller
parameter
: Provides a mechanism to provide stored procedure parameters.
Parameters can be either static or provided by using a SpEL Expressions.
<int-jdbc:parameter name="" type="" value=""/> <int-jdbc:parameter name="" expression=""/>
+
<1> The name of the parameter to be passed into the Stored Procedure or Stored Function.
Required.
<2> This attribute specifies the type of the value.
If nothing is provided, this attribute defaults to java.lang.String
.
This attribute is used only when the value
attribute is used.
Optional.
<3> The value of the parameter.
You must provide either this attribute or the expression
attribute.
Optional.
<4> Instead of the value
attribute, you can specify a SpEL expression for passing the value of the parameter.
If you specify the expression
, the value
attribute is not allowed.
Optional.
Optional.
returning-resultset
: Stored procedures may return multiple result sets.
By setting one or more returning-resultset
elements, you can specify RowMappers
to convert each returned ResultSet
to meaningful objects.
Optional.
<int-jdbc:returning-resultset name="" row-mapper="" />
sql-parameter-definition
: If you use a database that is fully supported, you typically do not have to specify the stored procedure parameter definitions.
Instead, those parameters can be automatically derived from the JDBC metadata.
However, if you use databases that are not fully supported, you must set those parameters explicitly by using the sql-parameter-definition
element.
You can also choose to turn off any processing of parameter metadata information obtained through JDBC by using the ignore-column-meta-data
attribute.
<int-jdbc:sql-parameter-definition name="" direction="IN" type="STRING" scale="5" type-name="FOO_STRUCT" return-type="fooSqlReturnType"/>
Specifies the name of the SQL parameter. Required. | |
Specifies the direction of the SQL parameter definition.
Defaults to | |
The SQL type used for this SQL parameter definition.
Translates into an integer value, as defined by | |
The scale of the SQL parameter. Only used for numeric and decimal parameters. Optional. | |
The | |
The reference to a custom value handler for complex types.
An implementation of |
poller
: Lets you configure a message poller if this endpoint is a PollingConsumer
.
Optional.
Parameter sources govern the techniques of retrieving and mapping the Spring Integration message properties to the relevant stored procedure input parameters.
The stored procedure components follow certain rules.
By default, the bean properties of the Message
payload are used as a source for the stored procedure’s input parameters.
In that case, a BeanPropertySqlParameterSourceFactory
is used.
This may suffice for basic use cases.
The next example illustrates that default behavior.
Important | |
---|---|
For the " |
Suppose we have a payload that consists of a simple bean with the following three properties: id
, name
, and description
.
Furthermore, we have a simplistic Stored Procedure called INSERT_COFFEE
that accepts three input parameters: id
, name
, and description
.
We also use a fully supported database.
In that case, the following configuration for a stored procedure outbound adapter suffices:
<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource" channel="insertCoffeeProcedureRequestChannel" stored-procedure-name="INSERT_COFFEE"/>
For more sophisticated options, consider passing in one or more ProcedureParameter
values.
If you do provide ProcedureParameter
values explicitly, by default, an ExpressionEvaluatingSqlParameterSourceFactory
is used for parameter processing, to enable the full power of SpEL expressions.
If you need even more control over how parameters are retrieved, consider passing in a custom implementation of SqlParameterSourceFactory
by using the sql-parameter-source-factory
attribute.
The following listing calls out the attributes that matter for a stored procedure inbound channel adapter:
<int-jdbc:stored-proc-inbound-channel-adapter channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" is-function="false" skip-undeclared-results="" return-value-required="false" <int:poller/> <int-jdbc:sql-parameter-definition name="" direction="IN" type="STRING" scale=""/> <int-jdbc:parameter name="" type="" value=""/> <int-jdbc:parameter name="" expression=""/> <int-jdbc:returning-resultset name="" row-mapper="" /> </int-jdbc:stored-proc-inbound-channel-adapter>
Channel to which polled messages are sent.
If the stored procedure or function does not return any data, the payload of the | |
If this attribute is set to | |
Indicates whether this procedure’s return value should be included. Since Spring Integration 3.0. Optional. |
The following listing calls out the attributes that matter for a stored procedure outbound channel adapter:
<int-jdbc:stored-proc-outbound-channel-adapter channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" order="" sql-parameter-source-factory="" use-payload-as-parameter-source=""> <int:poller fixed-rate=""/> <int-jdbc:sql-parameter-definition name=""/> <int-jdbc:parameter name=""/> </int-jdbc:stored-proc-outbound-channel-adapter>
The receiving message channel of this endpoint. Required. | |
Specifies the order for invocation when this endpoint is connected as a subscriber to a channel.
This is particularly relevant when that channel is using a |
The following listing calls out the attributes that matter for a stored procedure outbound channel adapter:
<int-jdbc:stored-proc-outbound-gateway request-channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" is-function="false" order="" reply-channel="" reply-timeout="" return-value-required="false" skip-undeclared-results="" sql-parameter-source-factory="" use-payload-as-parameter-source=""> <int-jdbc:sql-parameter-definition name="" direction="IN" type="" scale="10"/> <int-jdbc:sql-parameter-definition name=""/> <int-jdbc:parameter name="" type="" value=""/> <int-jdbc:parameter name="" expression=""/> <int-jdbc:returning-resultset name="" row-mapper="" />
The receiving message channel of this endpoint. Required. | |
Message channel to which replies should be sent after receiving the database response. Optional. | |
Lets you specify how long this gateway waits for the reply message to be sent successfully before throwing an exception.
Keep in mind that, when sending to a | |
Indicates whether this procedure’s return value should be included. Optional. | |
If the |
This section contains two examples that call Apache Derby stored procedures.
The first procedure calls a stored procedure that returns a ResultSet
.
By using a RowMapper
, the data is converted into a domain object, which then becomes the Spring Integration message payload.
In the second sample, we call a stored procedure that uses output parameters to return data instead.
Note | |
---|---|
Have a look at the Spring Integration Samples project. The project contains the Apache Derby example referenced here, as well as instructions on how to run it. The Spring Integration Samples project also provides an example of using Oracle stored procedures. |
In the first example, we call a stored procedure named FIND_ALL_COFFEE_BEVERAGES
that does not define any input parameters but that returns a ResultSet
.
In Apache Derby, stored procedures are implemented in Java. The following listing shows the method signature:
public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages) throws SQLException { ... }
The following listing shows the corresponding SQL:
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \ PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \ EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';
In Spring Integration, you can now call this stored procedure by using, for example, a stored-proc-outbound-gateway
, as the following example shows:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all" data-source="dataSource" request-channel="findAllProcedureRequestChannel" expect-single-result="true" stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES"> <int-jdbc:returning-resultset name="coffeeBeverages" row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/> </int-jdbc:stored-proc-outbound-gateway>
In the second example, we call a stored procedure named FIND_COFFEE
that has one input parameter.
Instead of returning a ResultSet
, it uses an output parameter.
The following example shows the method signature:
public static void findCoffee(int coffeeId, String[] coffeeDescription) throws SQLException { ... }
The following listing shows the corresponding SQL:
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \ PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \ 'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';
In Spring Integration, you can now call this Stored Procedure by using, for example, a stored-proc-outbound-gateway
, as the following example shows:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee" data-source="dataSource" request-channel="findCoffeeProcedureRequestChannel" skip-undeclared-results="true" stored-procedure-name="FIND_COFFEE" expect-single-result="true"> <int-jdbc:parameter name="ID" expression="payload" /> </int-jdbc:stored-proc-outbound-gateway>
Version 4.3 introduced the JdbcLockRegistry
.
Certain components (for example, aggregator and resequencer) use a lock obtained from a LockRegistry
instance to ensure that only one thread manipulates a group at a time.
The DefaultLockRegistry
performs this function within a single component.
You can now configure an external lock registry on these components.
When used with a shared MessageGroupStore
, you can use the JdbcLockRegistry
to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
When a lock is released by a local thread, another local thread can generally acquire the lock immediately. If a lock is released by a thread that uses a different registry instance, it can take up to 100ms to acquire the lock.
The JdbcLockRegistry
is based on the LockRepository
abstraction, which has a DefaultLockRepository
implementation.
The database schema scripts are located in the org.springframework.integration.jdbc
package, which is divided for the particular RDBMS vendors.
For example, the following listing shows the H2 DDL for the lock table:
CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36), REGION VARCHAR(100), CLIENT_ID CHAR(36), CREATED_DATE TIMESTAMP NOT NULL, constraint LOCK_PK primary key (LOCK_KEY, REGION) );
The INT_
can be changed according to the target database design requirements.
Therefore, you must use prefix
property on the DefaultLockRepository
bean definition.
Sometimes, one application has moved to such a state that it cannot release the distributed lock and remove the particular record in the database.
For this purpose, such dead locks can be expired by the other application on the next locking invocation.
The timeToLive
(TTL) option on the DefaultLockRepository
is provided for this purpose.
You may also want to specify CLIENT_ID
for the locks stored for a given DefaultLockRepository
instance.
If so, you can specify the id
to be associated with the DefaultLockRepository
as a constructor parameter.
Version 5.0 introduced the JDBC MetadataStore
(see Section 12.5, “Metadata Store”) implementation.
You can use the JdbcMetadataStore
to maintain the metadata state across application restarts.
This MetadataStore
implementation can be used with adapters such as the following:
To configure these adapters to use the JdbcMetadataStore
, declare a Spring bean by using a bean name of metadataStore
.
The Feed inbound channel adapter and the feed inbound channel adapter both automatically pick up and use the declared JdbcMetadataStore
, as the following example shows:
@Bean public MetadataStore metadataStore(DataSource dataSource) { return new JdbcMetadataStore(dataSource); }
The org.springframework.integration.jdbc
package has Database schema scripts for several RDMBS vendors.
For example, the following listing shows the H2 DDL for the metadata table:
CREATE TABLE INT_METADATA_STORE ( METADATA_KEY VARCHAR(255) NOT NULL, METADATA_VALUE VARCHAR(4000), REGION VARCHAR(100) NOT NULL, constraint METADATA_STORE primary key (METADATA_KEY, REGION) );
You can change the INT_
prefix to match the target database design requirements.
You can also configure JdbcMetadataStore
to use the custom prefix.
The JdbcMetadataStore
implements ConcurrentMetadataStore
, letting it be reliably shared across multiple application instances, where only one instance can store or modify a key’s value.
All of these operations are atomic, thanks to transaction guarantees.
Transaction management must use JdbcMetadataStore
.
Inbound channel adapters can be supplied with a reference to the TransactionManager
in the poller configuration.
Unlike non-transactional MetadataStore
implementations, with JdbcMetadataStore
, the entry appears in the target table only after the transaction commits.
When a rollback occurs, no entries are added to the INT_METADATA_STORE
table.
Since version 5.0.7, you can configure the JdbcMetadataStore
with the RDBMS vendor-specific lockHint
option for lock-based queries on metadata store entries.
By default, it is FOR UPDATE
and can be configured with an empty string if the target database does not support row locking functionality.
Consult with your vendor for particular and possible hints in the SELECT
expression for locking rows before updates.