Spring Integration provides Channel Adapters for receiving and sending messages via database queries. Through those adapters Spring Integration supports not only plain JDBC SQL Queries, but also Stored Procedure and Stored Function calls.
The following JDBC components are available by default:
Furthermore, the Spring Integration JDBC Module also provides a JDBC Message Store
The main function of an inbound Channel Adapter is to execute a SQL SELECT
query and turn the result set as a message.
The message payload is the whole result set, expressed as a List
, and the types of the items in the list depend on the row-mapping strategy that is used.
The default strategy is a generic mapper that just returns a Map
for each row in the query result.
Optionally, this can be changed by adding a reference to a RowMapper
instance (see the Spring JDBC documentation for more detailed information about row mapping).
Note | |
---|---|
If you want to convert rows in the SELECT query result to individual messages you can use a downstream splitter. |
The inbound adapter also requires a reference to either a JdbcTemplate
instance or a DataSource
.
As well as the SELECT
statement to generate the messages, the adapter above also has an UPDATE
statement that is being used to mark the records as processed so that they don’t show up in the next poll.
The update can be parameterized by the list of ids from the original select.
This is done through a naming convention by default (a column in the input result set called "id" is translated into a list in the parameter map for the update called "id").
The following example defines an inbound Channel Adapter with an update query and a DataSource
reference.
<int-jdbc:inbound-channel-adapter query="select * from item where status=2" channel="target" data-source="dataSource" update="update item set status=10 where id in (:id)" />
Note | |
---|---|
The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which in this case is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC combined with a convention (projection onto the polled result list) adopted in Spring Integration. The underlying Spring JDBC features limit the available expressions (e.g. most special characters other than period are disallowed), but since the target is usually a list of or an individual object addressable by simple bean paths this isn’t unduly restrictive. |
To change the parameter generation strategy you can inject a SqlParameterSourceFactory
into the adapter to override the default behavior (the adapter has a sql-parameter-source-factory
attribute).
Spring Integration provides a ExpressionEvaluatingSqlParameterSourceFactory
which will create a SpEL-based parameter source, with the results of the query as the #root
object.
(If update-per-row
is true, the root object is the row).
If the same parameter name appears multiple times in the update query, it is evaluated only one time, and its result is cached.
You can also use a parameter source for the select query. In this case, since there is no "result" object to evaluate against, a single parameter source is used each time (rather than using a parameter source factory). Starting with version 4.0, you can use Spring to create a SpEL based parameter source as follows:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status" channel="target" data-source="dataSource" select-sql-parameter-source="parameterSource" /> <bean id="parameterSource" factory-bean="parameterSourceFactory" factory-method="createParameterSourceNoCache"> <constructor-arg value="" /> </bean> <bean id="parameterSourceFactory" class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <property name="parameterExpressions"> <map> <entry key="status" value="@statusBean.which()" /> </map> </property> </bean> <bean id="statusBean" class="foo.StatusDetermination" />
The value
in each parameter expression can be any valid SpEL expression.
The #root
object for the expression evaluation is the constructor argument defined on the parameterSource
bean.
It is static for all evaluations (in this case, an empty String).
Starting with version 5.0, the ExpressionEvaluatingSqlParameterSourceFactory
can be supplied with the sqlParameterTypes
to specify the target SQL type for the particular parameter.
Below example provides sql type for the parameters being used in the query.
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status" channel="target" data-source="dataSource" select-sql-parameter-source="parameterSource" /> <bean id="parameterSource" factory-bean="parameterSourceFactory" factory-method="createParameterSourceNoCache"> <constructor-arg value="" /> </bean> <bean id="parameterSourceFactory" class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <property name="sqlParameterTypes"> <map> <entry key="status" value="#{ T(java.sql.Types).BINARY}" /> </map> </property> </bean>
Important | |
---|---|
Use the |
The inbound adapter accepts a regular Spring Integration poller as a sub element, so for instance the frequency of the polling can be controlled. A very important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, for example:
<int-jdbc:inbound-channel-adapter query="..." channel="target" data-source="dataSource" update="..."> <int:poller fixed-rate="1000"> <int:transactional/> </int:poller> </int-jdbc:inbound-channel-adapter>
Note | |
---|---|
If a poller is not explicitly specified, a default value will be used (and as per normal with Spring Integration can be defined as a top level bean). |
In this example the database is polled every 1000 milliseconds, and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown, but as long as it is aware of the data source then the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread, and hence the same transaction. Then if any of them fail, the transaction rolls back and the input data is reverted to its original state.
The JDBC Inbound Channel Adapter defines an attribute max-rows-per-poll
.
When you specify the adapter’s Poller, you can also define a property called max-messages-per-poll
.
While these two attributes look similar, their meaning is quite different.
max-messages-per-poll
specifies the number of times the query is executed per polling interval, whereas max-rows-per-poll
specifies the number of rows returned for each execution.
Under normal circumstances, you would likely not want to set the Poller’s max-messages-per-poll
property when using the JDBC Inbound Channel Adapter.
Its default value is 1, which means that the JDBC Inbound Channel Adapter's receive()
method is executed exactly once for each poll interval.
Setting the max-messages-per-poll
attribute to a larger value means that the query is executed that many times back to back.
For more information regarding the max-messages-per-poll
attribute, please see Section 4.3.1, “Configuring An Inbound Channel Adapter”.
In contrast, the max-rows-per-poll
attribute, if greater than 0, specifies the maximum number of rows that will be used from the query result set, per execution of the receive()
method.
If the attribute is set to 0, then all rows will be included in the resulting message.
If not explicitly set, the attribute defaults to 0.
The outbound Channel Adapter is the inverse of the inbound: its role is to handle a message and use it to execute a SQL query. The message payload and headers are available by default as input parameters to the query, for instance:
<int-jdbc:outbound-channel-adapter query="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])" data-source="dataSource" channel="input"/>
In the example above, messages arriving on the channel labelled input have a payload of a map with key foo, so the []
operator dereferences that value from the map.
The headers are also accessed as a map.
Note | |
---|---|
The parameters in the query above are bean property expressions on the incoming message (not Spring EL expressions).
This behavior is part of the |
The outbound adapter requires a reference to either a DataSource
or a JdbcTemplate
.
It can also have a SqlParameterSourceFactory
injected to control the binding of each incoming message to a query.
If the input channel is a direct channel, then the outbound adapter runs its query in the same thread, and therefore the same transaction (if there is one) as the sender of the message.
Passing Parameters using SpEL Expressions
A common requirement for most JDBC Channel Adapters is to pass parameters as part of Sql queries or Stored Procedures/Functions.
As mentioned above, these parameters are by default bean property expressions, not SpEL expressions.
However, if you need to pass SpEL expression as parameters, you must inject a SqlParameterSourceFactory
explicitly.
The following example uses a ExpressionEvaluatingSqlParameterSourceFactory
to achieve that requirement.
<jdbc:outbound-channel-adapter data-source="dataSource" channel="input" query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) \ values (:id, :payload, :createdDate)" sql-parameter-source-factory="spelSource"/> <bean id="spelSource" class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <property name="parameterExpressions"> <map> <entry key="id" value="headers['id'].toString()"/> <entry key="createdDate" value="new java.util.Date()"/> <entry key="payload" value="payload"/> </map> </property> </bean>
For further information, please also see Section 19.5.5, “Defining Parameter Sources”
PreparedStatement Callback
There are some cases when the flexibility and loose-coupling of SqlParameterSourceFactory
isn’t enough for the target
PreparedStatement
or we need to do some low-level JDBC work.
The Spring JDBC module provides APIs to configure the execution environment (e.g. ConnectionCallback
or PreparedStatementCreator
) and manipulation of parameter values (e.g. SqlParameterSource
).
Or even APIs for low level operations, for example StatementCallback
.
Starting with Spring Integration 4.2, the MessagePreparedStatementSetter
is available to allow
the specification of parameters on the PreparedStatement
manually, in the requestMessage
context.
This class plays exactly the same role as PreparedStatementSetter
in the standard Spring JDBC API.
Actually it is invoked directly from an inline PreparedStatementSetter
implementation, when the JdbcMessageHandler
invokes execute
on the JdbcTemplate
.
This functional interface option is mutually exclusive with sqlParameterSourceFactory
and can be used as a more
powerful alternative to populate parameters of the PreparedStatement
from the requestMessage
.
For example it is useful when we need to store File
data to the DataBase BLOB
column in a stream manner:
@Bean @ServiceActivator(inputChannel = "storeFileChannel") public MessageHandler jdbcMessageHandler(DataSource dataSource) { JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource, "INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)"); jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> { ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME)); try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) { ps.setBlob(2, inputStream); } catch (Exception e) { throw new MessageHandlingException(m, e); } ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class))); }); return jdbcMessageHandler; }
From the XML configuration perspective, the prepared-statement-setter
attribute is available on the
<int-jdbc:outbound-channel-adapter>
component, to specify a MessagePreparedStatementSetter
bean reference.
The outbound Gateway is like a combination of the outbound and inbound adapters: its role is to handle a message and use it to execute a SQL query and then respond with the result sending it to a reply channel. The message payload and headers are available by default as input parameters to the query, for instance:
<int-jdbc:outbound-gateway update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])" request-channel="input" reply-channel="output" data-source="dataSource" />
The result of the above would be to insert a record into the "foos" table and return a message to the output channel indicating the number of rows affected (the payload is a map: {UPDATED=1}
).
If the update query is an insert with auto-generated keys, the reply message can be populated with the generated keys by adding keys-generated="true"
to the above example (this is not the default because it is not supported by some database platforms).
For example:
<int-jdbc:outbound-gateway update="insert into foos (status, name) values (0, :payload[foo])" request-channel="input" reply-channel="output" data-source="dataSource" keys-generated="true"/>
Instead of the update count or the generated keys, you can also provide a select query to execute and generate a reply message from the result (like the inbound adapter), e.g:
<int-jdbc:outbound-gateway update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])" query="select * from foos where id=:headers[$id]" request-channel="input" reply-channel="output" data-source="dataSource"/>
Since Spring Integration 2.2 the update SQL query is no longer mandatory. You can now solely provide a select query, using either the query attribute or the query sub-element. This is extremely useful if you need to actively retrieve data using e.g. a generic Gateway or a Payload Enricher. The reply message is then generated from the result, like the inbound adapter, and passed to the reply channel.
<int-jdbc:outbound-gateway query="select * from foos where id=:headers[id]" request-channel="input" reply-channel="output" data-source="dataSource"/>
Important | |
---|---|
By default the component for the SELECT query returns only one, first row from the cursor.
This can be adjusted with the |
As with the channel adapters, you can also provide SqlParameterSourceFactory
instances for request and reply.
The default is the same as for the outbound adapter, so the request message is available as the root of an expression.
If keys-generated="true"
then the root of the expression is the generated keys (a map if there is only one or a list of maps if multi-valued).
The outbound gateway requires a reference to either a DataSource or a JdbcTemplate.
It can also have a SqlParameterSourceFactory
injected to control the binding of the incoming message to the query.
Starting with the version 4.2 the request-prepared-statement-setter
attribute is available on the
<int-jdbc:outbound-gateway>
as an alternative to the request-sql-parameter-source-factory
.
It allows you to specify a MessagePreparedStatementSetter
bean reference, which implements more sophisticated
PreparedStatement
preparation before its execution.
See Section 19.2, “Outbound Channel Adapter” for more information about MessagePreparedStatementSetter
.
Spring Integration provides 2 JDBC specific Message Store implementations.
The first one, is the JdbcMessageStore
which is suitable to be used in conjunction with Aggregators and the Claim-Check pattern.
While it can be used for backing Message Channels as well, you may want to consider using the JdbcChannelMessageStore
implementation instead, as it provides a more targeted and scalable implementation.
Before starting to use JDBC Message Store components, it is important to provision target data base with the appropriate objects.
Spring Integration ships with some sample scripts that can be used to initialize a database.
In the spring-integration-jdbc
JAR file you can find scripts in the org.springframework.integration.jdbc
package: there is a create and a drop script example for a range of common database platforms.
A common way to use these scripts is to reference them in a Spring JDBC data source initializer.
Note that the scripts are provided as samples or specifications of the the required table and column names.
You may find that you need to enhance them for production use (e.g. with index declarations).
The JDBC module provides an implementation of the Spring Integration MessageStore
(important in the Claim Check pattern) and MessageGroupStore
(important in stateful patterns like Aggregator) backed by a database.
Both interfaces are implemented by the JdbcMessageStore
, and there is also support for configuring store instances in XML.
For example:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
A JdbcTemplate
can be specified instead of a DataSource
.
Other optional attributes are show in the next example:
<int-jdbc:message-store id="messageStore" data-source="dataSource" lob-handler="lobHandler" table-prefix="MY_INT_"/>
Here we have specified a LobHandler
for dealing with messages as large objects (e.g.
often necessary if using Oracle) and a prefix for the table names in the queries generated by the store.
The table name prefix defaults to INT_
.
If you intend backing Message Channels using JDBC, it is recommended to use the provided JdbcChannelMessageStore
implementation instead.
It can only be used in conjunction with Message Channels.
Supported Databases
The JdbcChannelMessageStore
uses database specific SQL queries to retrieve messages from the database.
Therefore, users must set the ChannelMessageStoreQueryProvider
property on the JdbcChannelMessageStore
.
This channelMessageStoreQueryProvider
provides the SQL queries and Spring Integration provides support for the following relational databases:
If your database is not listed, you can easily extend the AbstractChannelMessageStoreQueryProvider
class and provide your own custom queries.
Since version 4.0, the MESSAGE_SEQUENCE
column has been added to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.
Since version 5.0, by overloading ChannelMessageStorePreparedStatementSetter
class you can provide custom implementation for message insertion in the JdbcChannelMessageStore
.
It might be different columns or table structure or serialization strategy.
For example, instead of default serialization to byte[]
, we can store its structure in JSON string.
Below example uses the default implementation of setValues
to store common columns and overrides the behavior just to store the message payload as varchar.
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter { public JsonPreparedStatementSetter() { super(); } @Override public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage, Object groupId, String region, boolean priorityEnabled) throws SQLException { // Populate common columns super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled); // Store message payload as varchar preparedStatement.setString(6, requestMessage.getPayload().toString()); } }
Important | |
---|---|
Generally it is not recommended to use a relational database for the purpose of queuing. Instead, if possible, consider using either JMS or AMQP backed channels instead. For further reference please see the following resources: |
Concurrent Polling
When polling a Message Channel, you have the option to configure the associated Poller
with a TaskExecutor
reference.
Important | |
---|---|
Keep in mind, though, that if you use a JDBC backed Message Channel and you are planning on polling the channel and consequently the message store transactionally with multiple threads, you should ensure that you use a relational database that supports Multiversion Concurrency Control (MVCC). Otherwise, locking may be an issue and the performance, when using multiple threads, may not materialize as expected. For example Apache Derby is problematic in that regard. To achieve better JDBC queue throughput, and avoid issues when different threads may poll the same |
<bean id="queryProvider" class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/> <int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" /> <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/> </int:transaction-synchronization-factory> <task:executor id="pool" pool-size="10" queue-capacity="10" rejection-policy="CALLER_RUNS" /> <bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore"> <property name="dataSource" ref="dataSource"/> <property name="channelMessageStoreQueryProvider" ref="queryProvider"/> <property name="region" value="TX_TIMEOUT"/> <property name="usingIdCache" value="true"/> </bean> <int:channel id="inputChannel"> <int:queue message-store="store"/> </int:channel> <int:bridge input-channel="inputChannel" output-channel="outputChannel"> <int:poller fixed-delay="500" receive-timeout="500" max-messages-per-poll="1" task-executor="pool"> <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="transactionManager" /> </int:poller> </int:bridge> <int:channel id="outputChannel" />
Priority Channel
Starting with version 4.0, the JdbcChannelMessageStore
implements PriorityCapableChannelMessageStore
and provides the priorityEnabled
option allowing it to be used as a message-store
reference for priority-queue
s.
For this purpose, the INT_CHANNEL_MESSAGE
has a MESSAGE_PRIORITY
column to store the value of PRIORITY
Message header.
In addition, a new MESSAGE_SEQUENCE
column is also provided to achieve a robust first-in-first-out (FIFO) polling mechanism, even when multiple messages are stored with the same priority in the same millisecond.
Messages are polled (selected) from the database with order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
.
Note | |
---|---|
It’s not recommended to use the same |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore"> <property name="dataSource" ref="dataSource"/> <property name="channelMessageStoreQueryProvider" ref="queryProvider"/> </bean> <int:channel id="queueChannel"> <int:queue message-store="channelStore"/> </int:channel> <bean id="priorityStore" parent="channelStore"> <property name="priorityEnabled" value="true"/> </bean> <int:channel id="priorityChannel"> <int:priority-queue message-store="priorityStore"/> </int:channel>
It is common to use a JdbcMessageStore
as a global store for a group of applications, or nodes in the same application.
To provide some protection against name clashes, and to give control over the database meta-data configuration, the message store allows the tables to be partitioned in two ways.
One is to use separate table names, by changing the prefix as described above, and the other is to specify a "region" name for partitioning data within a single table.
An important use case for this is when the MessageStore is managing persistent queues backing a Spring Integration Message Channel.
The message data for a persistent channel is keyed in the store on the channel name, so if the channel names are not globally unique then there is the danger of channels picking up data that was not intended for them.
To avoid this, the message store region can be used to keep data separate for different physical channels that happen to have the same logical name.
In certain situations plain JDBC support is not sufficient. Maybe you deal with legacy relational database schemas or you have complex data processing needs, but ultimately you have to use Stored Procedures or Stored Functions. Since Spring Integration 2.1, we provide three components in order to execute Stored Procedures or Stored Functions:
In order to enable calls to Stored Procedures and Stored Functions, the Stored Procedure components use the org.springframework.jdbc.core.simple.SimpleJdbcCall
class.
Consequently, the following databases are fully supported for executing Stored Procedures:
If you want to execute Stored Functions instead, the following databases are fully supported:
Note | |
---|---|
Even though your particular database may not be fully supported, chances are, that you can use the Stored Procedure Spring Integration components quite successfully anyway, provided your RDBMS supports Stored Procedures or Functions. As a matter of fact, some of the provided integration tests use the H2 database. Nevertheless, it is very important to thoroughly test those usage scenarios. |
The Stored Procedure components provide full XML Namespace support and configuring the components is similar as for the general purpose JDBC components discussed earlier.
Certain configuration parameters are shared among all Stored Procedure components and are described below:
auto-startup
Lifecycle attribute signaling if this component should be started during Application Context startup.
Defaults to true
.
Optional.
data-source
Reference to a javax.sql.DataSource
, which is used to access the database. Required.
id
Identifies the underlying Spring bean definition, which is an instance of either EventDrivenConsumer
or PollingConsumer
, depending on whether the Outbound Channel Adapter’s channel
attribute references a SubscribableChannel
or a PollableChannel
.
Optional.
ignore-column-meta-data
For fully supported databases, the underlying SimpleJdbcCall
class can automatically retrieve the parameter information for the to be invoked Stored Procedure or Function from the JDBC Meta-data.
However, if the used database does not support meta data lookups or if you like to provide customized parameter definitions, this flag can be set to true
.
It defaults to false
.
Optional.
is-function
If true
, a SQL Function is called.
In that case the stored-procedure-name
or stored-procedure-name-expression
attributes define the name of the called function.
Defaults to false
.
Optional.
stored-procedure-name
The attribute specifies the name of the stored procedure.
If the is-function
attribute is set to true
, this attribute specifies the function name instead.
Either this property or stored-procedure-name-expression
must be specified.
stored-procedure-name-expression
This attribute specifies the name of the stored procedure using a SpEL expression. Using SpEL you have access to the full message (if available), including its headers and payload. You can use this attribute to invoke different Stored Procedures at runtime. For example, you can provide Stored Procedure names that you would like to execute as a Message Header. The expression must resolve to a String.
If the is-function
attribute is set to true
, this attribute specifies a Stored Function.
Either this property or stored-procedure-name must be specified.
jdbc-call-operations-cache-size
Defines the maximum number of cached SimpleJdbcCallOperations
instances.
Basically, for each Stored Procedure Name a new SimpleJdbcCallOperations
instance is created that in return is being cached.
Note | |
---|---|
The |
The default cache size is 10. A value of 0 disables caching. Negative values are not permitted.
If you enable JMX, statistical information about the jdbc-call-operations-cache
is exposed as MBean.
Please see Section 10.2.7, “MBean Exporter” for more information.
sql-parameter-source-factory (Not available for the Stored Procedure Inbound Channel Adapter.)
Reference to a SqlParameterSourceFactory
.
By default bean properties of the passed in Message
payload will be used as a source for the Stored Procedure’s input parameters using a BeanPropertySqlParameterSourceFactory
.
This may be sufficient for basic use cases.
For more sophisticated options, consider passing in one or more ProcedureParameter
.
Please also refer to Section 19.5.5, “Defining Parameter Sources”.
Optional.
use-payload-as-parameter-source (Not available for the Stored Procedure Inbound Channel Adapter.)
If set to true
, the payload of the Message will be used as a source for providing parameters.
If false, however, the entire Message will be available as a source for parameters.
If no Procedure Parameters are passed in, this property will default to true
.
This means that using a default BeanPropertySqlParameterSourceFactory
the bean properties of the payload will be used as a source for parameter values for the to-be-executed Stored Procedure or Stored Function.
However, if Procedure Parameters are passed in, then this property will by default evaluate to false
.
ProcedureParameter
allow for SpEL Expressions to be provided and therefore it is highly beneficial to have access to the entire Message.
The property is set on the underlying StoredProcExecutor
.
Optional.
The Stored Procedure components share a common set of sub-elements to define and pass parameters to Stored Procedures or Functions. The following elements are available:
parameter
returning-resultset
sql-parameter-definition
poller
parameter
Provides a mechanism to provide Stored Procedure parameters. Parameters can be either static or provided using a SpEL Expressions. Optional.
<int-jdbc:parameter name="" type="" value=""/> <int-jdbc:parameter name="" expression=""/>
The name of the parameter to be passed into the Stored Procedure or Stored Function. Required. | |
This attribute specifies the type of the value.
If nothing is provided this attribute will default to | |
The value of the parameter.
You have to provider either this attribute or the | |
Instead of the |
returning-resultset
Stored Procedures may return multiple result sets.
By setting one or more returning-resultset
elements, you can specify RowMappers
in order to convert each returned ResultSet
to meaningful objects.
Optional.
<int-jdbc:returning-resultset name="" row-mapper="" />
sql-parameter-definition
If you are using a database that is fully supported, you typically don’t have to specify the Stored Procedure parameter definitions.
Instead, those parameters can be automatically derived from the JDBC Meta-data.
However, if you are using databases that are not fully supported, you must set those parameters explicitly using the sql-parameter-definition
sub-element.
You can also choose to turn off any processing of parameter meta data information obtained via JDBC using the ignore-column-meta-data
attribute.
<int-jdbc:sql-parameter-definition name="" direction="IN" type="STRING" scale="5" type-name="FOO_STRUCT" return-type="fooSqlReturnType"/>
Specifies the name of the SQL parameter. Required. | |
Specifies the direction of the SQL parameter definition.
Defaults to | |
The SQL type used for this SQL parameter definition. Will translate into the integer value as defined by java.sql.Types. Alternatively you can provide the integer value as well. If this attribute is not explicitly set, then it will default to VARCHAR. Optional. | |
The scale of the SQL parameter. Only used for numeric and decimal parameters. Optional. | |
The typeName for types that are user-named like: | |
The reference to a custom value handler for complex types. An implementation of SqlReturnType. This attribute is mutually exclusive with the scale attribute and is applicable for OUT(INOUT)-parameters only. Optional. |
poller
Allows you to configure a Message Poller if this endpoint is a PollingConsumer
.
Optional.
Parameter Sources govern the techniques of retrieving and mapping the Spring Integration Message properties to the relevant Stored Procedure input parameters. The Stored Procedure components follow certain rules.
By default bean properties of the passed in Message
payload will be used as a source for the Stored Procedure’s input parameters.
In that case a BeanPropertySqlParameterSourceFactory
will be used.
This may be sufficient for basic use cases.
The following example illustrates that default behavior.
Important | |
---|---|
Please be aware that for the "automatic" lookup of bean properties using the |
Let’s assume we have a payload that consists of a simple bean with the following three properties: id, name and description. Furthermore, we have a simplistic Stored Procedure called INSERT_COFFEE that accepts three input parameters: id, name and description. We also use a fully supported database. In that case the following configuration for a Stored Procedure Outbound Adapter will be sufficient:
<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource" channel="insertCoffeeProcedureRequestChannel" stored-procedure-name="INSERT_COFFEE"/>
For more sophisticated options consider passing in one or more ProcedureParameter
.
If you do provide ProcedureParameter
explicitly, then as default an ExpressionEvaluatingSqlParameterSourceFactory
will be used for parameter processing in order to enable the full power of SpEL expressions.
Furthermore, if you need even more control over how parameters are retrieved, consider passing in a custom implementation of a SqlParameterSourceFactory
using the sql-parameter-source-factory
attribute.
<int-jdbc:stored-proc-inbound-channel-adapter channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" is-function="false" skip-undeclared-results="" return-value-required="false" <int:poller/> <int-jdbc:sql-parameter-definition name="" direction="IN" type="STRING" scale=""/> <int-jdbc:parameter name="" type="" value=""/> <int-jdbc:parameter name="" expression=""/> <int-jdbc:returning-resultset name="" row-mapper="" /> </int-jdbc:stored-proc-inbound-channel-adapter>
Channel to which polled messages will be sent. If the stored procedure or function does not return any data, the payload of the Message will be Null. Required. | |
If this attribute is set to | |
Indicates whether this procedure’s return value should be included. Since Spring Integration 3.0. Optional. |
<int-jdbc:stored-proc-outbound-channel-adapter channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" order="" sql-parameter-source-factory="" use-payload-as-parameter-source=""> <int:poller fixed-rate=""/> <int-jdbc:sql-parameter-definition name=""/> <int-jdbc:parameter name=""/> </int-jdbc:stored-proc-outbound-channel-adapter>
The receiving Message Channel of this endpoint. Required. | |
Specifies the order for invocation when this endpoint is connected as a subscriber to a channel. This is particularly relevant when that channel is using a failover dispatching strategy. It has no effect when this endpoint itself is a Polling Consumer for a channel with a queue. Optional. |
<int-jdbc:stored-proc-outbound-gateway request-channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" is-function="false" order="" reply-channel="" reply-timeout="" return-value-required="false" skip-undeclared-results="" sql-parameter-source-factory="" use-payload-as-parameter-source=""> <int-jdbc:sql-parameter-definition name="" direction="IN" type="" scale="10"/> <int-jdbc:sql-parameter-definition name=""/> <int-jdbc:parameter name="" type="" value=""/> <int-jdbc:parameter name="" expression=""/> <int-jdbc:returning-resultset name="" row-mapper="" />
The receiving Message Channel of this endpoint. Required. | |
Message Channel to which replies should be sent, after receiving the database response. Optional. | |
Allows you to specify how long this gateway will wait for the reply message to be sent successfully before throwing an exception.
Keep in mind that when sending to a | |
Indicates whether this procedure’s return value should be included. Optional. | |
If the |
In the following two examples we call Apache Derby Stored Procedures.
The first procedure will call a Stored Procedure that returns a ResultSet
, and using a RowMapper
the data is converted into a domain object, which then becomes the Spring Integration message payload.
In the second sample we call a Stored Procedure that uses Output Parameters instead, in order to return data.
Note | |
---|---|
Please have a look at the Spring Integration Samples project, located at null The project contains the Apache Derby example referenced here, as well as instruction on how to run it. The Spring Integration Samples project also provides an example using Oracle Stored Procedures. |
In the first example, we call a Stored Procedure named FIND_ALL_COFFEE_BEVERAGES that does not define any input parameters but which returns a ResultSet
.
In Apache Derby, Stored Procedures are implemented using Java. Here is the method signature followed by the corresponding Sql:
public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages) throws SQLException { ... }
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \ PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \ EXTERNAL NAME 'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';
In Spring Integration, you can now call this Stored Procedure using e.g.
a stored-proc-outbound-gateway
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all" data-source="dataSource" request-channel="findAllProcedureRequestChannel" expect-single-result="true" stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES"> <int-jdbc:returning-resultset name="coffeeBeverages" row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/> </int-jdbc:stored-proc-outbound-gateway>
In the second example, we call a Stored Procedure named FIND_COFFEE that has one input parameter. Instead of returning a ResultSet, an output parameter is used:
public static void findCoffee(int coffeeId, String[] coffeeDescription) throws SQLException { ... }
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \ PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \ 'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';
In Spring Integration, you can now call this Stored Procedure using e.g.
a stored-proc-outbound-gateway
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee" data-source="dataSource" request-channel="findCoffeeProcedureRequestChannel" skip-undeclared-results="true" stored-procedure-name="FIND_COFFEE" expect-single-result="true"> <int-jdbc:parameter name="ID" expression="payload" /> </int-jdbc:stored-proc-outbound-gateway>
Starting with version 4.3, the JdbcLockRegistry
is available.
Certain components (for example aggregator and resequencer) use a lock obtained from a LockRegistry
instance to ensure that only one thread is manipulating a group at a time.
The DefaultLockRegistry
performs this function within a single component; you can now configure an external lock registry on these components.
When used with a shared MessageGroupStore
, the JdbcLockRegistry
can be use to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
When a lock is released by a local thread, another local thread will generally be able to acquire the lock immediately. If a lock is released by a thread using a different registry instance, it can take up to 100ms to acquire the lock.
The JdbcLockRegistry
is based on the LockRepository
abstraction, where a DefaultLockRepository
implementation is present.
The data base schema scripts are located in the org.springframework.integration.jdbc
package divided to the particular RDBMS vendors.
For example the H2 DDL for lock table looks like:
CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36), REGION VARCHAR(100), CLIENT_ID CHAR(36), CREATED_DATE TIMESTAMP NOT NULL, constraint LOCK_PK primary key (LOCK_KEY, REGION) );
The INT_
can be changed according to the target data base design requirements.
Therefore prefix
property must be used on the DefaultLockRepository
bean definition.
Sometimes it happens that one application has moved to the state when it can’t release distributed lock - remove the particular record in the data base.
For this purpose such dead locks can be expired by the other application on the next locking invocation.
The timeToLive
(TTL) option on the DefaultLockRepository
is provided for this purpose.
The user may also want to specify CLIENT_ID
for the locks stored for a given DefaultLockRepository
instance.
In this case you can specify the id
to be associated with the DefaultLockRepository
as a constructor parameter.
Starting with version 5.0, the JDBC MetadataStore
(Section 10.5, “Metadata Store”) implementation is available.
The JdbcMetadataStore
can be used to maintain metadata state across application restarts.
This MetadataStore
implementation can be used with adapters such as:
In order to configure these adapters to use the JdbcMetadataStore
, simply declare a Spring bean using the
bean name metadataStore. The Twitter Inbound Channel Adapter and the Feed Inbound Channel Adapter will both
automatically pick up and use the declared JdbcMetadataStore
:
@Bean public MetadataStore metadataStore(DataSource dataSource) { return new JdbcMetadataStore(dataSource); }
Data base schema scripts for several RDMBS vendors are located in the org.springframework.integration.jdbc
package.
For example the H2 DDL for metadata table looks like:
CREATE TABLE INT_METADATA_STORE ( METADATA_KEY VARCHAR(255) NOT NULL, METADATA_VALUE VARCHAR(4000), REGION VARCHAR(100) NOT NULL, constraint METADATA_STORE primary key (METADATA_KEY, REGION) );
The INT_
prefix can be changed according to the target data base design requirements and the JdbcMetadataStore
can be configured to use the custom prefix.
The JdbcMetadataStore
implements ConcurrentMetadataStore
, allowing it to be reliably shared across multiple
application instances where only one instance will be allowed to store or modify a key’s value.
All of these operations are atomic via transaction guarantees.
Transaction management is required to use JdbcMetadataStore
.
Inbound Channel Adapters can be supplied with a reference to the TransactionManager
in the poller configuration.
Unlike non-transactional MetadataStore
implementations, with JdbcMetadataStore
, the entry appears in the target table only after the transaction commits.
When a rollback occurs, no entries are added to the INT_METADATA_STORE
table.
Since version 5.0.7, the JdbcMetadataStore
can be configured with the RDBMS vendor-specific lockHint
option for lock-based queries on metadata store entries.
It is FOR UPDATE
by default and can be configured with an empty string, if the target data base doesn’t support row locking functionality.
Please, consult with your vendor for particular possible hint in the SELECT
expression for locking rows before updates.