Spring Integration provides Channel Adapters for receiving and sending messages via database queries. Through those adapters Spring Integration supports not only plain JDBC SQL Queries, but also Stored Procedure and Stored Function calls.
The following JDBC components are available by default:
Furthermore, the Spring Integration JDBC Module also provides a JDBC Message Store
The main function of an inbound Channel Adapter is to execute a SQL
SELECT
query and turn the result set as a message. The
message payload is the whole result set, expressed as a
List
, and the types of the items in the list
depend on the row-mapping strategy that is used. The default strategy is
a generic mapper that just returns a Map
for each
row in the query result. Optionally, this can be changed by adding a reference to
a RowMapper
instance (see the
Spring
JDBC documentation for more detailed information about row mapping).
Note | |
---|---|
If you want to convert rows in the SELECT query result to individual messages you can use a downstream splitter. |
The inbound adapter also requires a reference to either
a JdbcTemplate
instance or
a DataSource
.
As well as the SELECT
statement to generate the
messages, the adapter above also has an UPDATE
statement that
is being used to mark the records as processed so that they don't show up in
the next poll. The update can be parameterized by the list of ids from the
original select. This is done through a naming convention by default (a
column in the input result set called "id" is translated into a list in
the parameter map for the update called "id"). The following example
defines an inbound Channel Adapter with an update query and a
DataSource
reference.
<int-jdbc:inbound-channel-adapter query="select * from item where status=2" channel="target" data-source="dataSource" update="update item set status=10 where id in (:id)" />
Note | |
---|---|
The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which in this case is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC combined with a convention (projection onto the polled result list) adopted in Spring Integration. The underlying Spring JDBC features limit the available expressions (e.g. most special characters other than period are disallowed), but since the target is usually a list of or an individual object addressable by simple bean paths this isn't unduly restrictive. |
To change the parameter generation strategy you can inject a
SqlParameterSourceFactory
into the adapter to
override the default behavior (the adapter has a
sql-parameter-source-factory
attribute).
The inbound adapter accepts a regular Spring Integration poller as a sub element, so for instance the frequency of the polling can be controlled. A very important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, for example:
<int-jdbc:inbound-channel-adapter query="..." channel="target" data-source="dataSource" update="..."> <int:poller fixed-rate="1000"> <int:transactional/> </int:poller> </int-jdbc:inbound-channel-adapter>
Note | |
---|---|
If a poller is not explicitly specified, a default value will be used (and as per normal with Spring Integration can be defined as a top level bean). |
In this example the database is polled every 1000 milliseconds, and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown, but as long as it is aware of the data source then the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread, and hence the same transaction. Then if any of them fail, the transaction rolls back and the input data is reverted to its original state.
The outbound Channel Adapter is the inverse of the inbound: its role is to handle a message and use it to execute a SQL query. The message payload and headers are available by default as input parameters to the query, for instance:
<int-jdbc:outbound-channel-adapter query="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])" data-source="dataSource" channel="input"/>
In the example above, messages arriving on the channel labelled
input have a payload of a map with key
foo, so the []
operator dereferences
that value from the map. The headers are also accessed as a map.
Note | |
---|---|
The parameters in the query above are bean property expressions on the
incoming message (not Spring EL expressions). This behavior is part of the
SqlParameterSource
which is the default source created by the outbound adapter. Other
behavior is possible in the adapter, and requires the user to inject a
different SqlParameterSourceFactory .
|
The outbound adapter requires a reference to either a
DataSource
or a
JdbcTemplate
. It can also have a
SqlParameterSourceFactory
injected to control
the binding of each incoming message to a query.
If the input channel is a direct channel, then the outbound adapter runs its query in the same thread, and therefore the same transaction (if there is one) as the sender of the message.
Passing Parameters using SpEL Expressions
A common requirement for most JDBC Channel Adapters is to pass parameters
as part of Sql queries or Stored Procedures/Functions. As mentioned above,
these parameters are by default bean property expressions, not SpEL expressions.
However, if you need to pass SpEL expression as parameters, you must inject
a SqlParameterSourceFactory
explicitly.
The following example uses a ExpressionEvaluatingSqlParameterSourceFactory
to achieve that requirement.
<jdbc:outbound-channel-adapter data-source="dataSource" channel="input" query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) \ values (:id, :payload, :createdDate)" sql-parameter-source-factory="spelSource"/> <bean id="spelSource" class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <property name="parameterExpressions"> <map> <entry key="id" value="headers['id'].toString()"/> <entry key="createdDate" value="new java.util.Date()"/> <entry key="payload" value="payload"/> </map> </property> </bean>
For further information, please also see Section 17.5.5, “Defining Parameter Sources”
The outbound Gateway is like a combination of the outbound and inbound adapters: its role is to handle a message and use it to execute a SQL query and then respond with the result sending it to a reply channel. The message payload and headers are available by default as input parameters to the query, for instance:
<int-jdbc:outbound-gateway update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])" request-channel="input" reply-channel="output" data-source="dataSource" />
The result of the above would be to insert a record into the "foos"
table and return a message to the output channel indicating the number of
rows affected (the payload is a map: {UPDATED=1}
).
If the update query is an insert with auto-generated keys, the reply
message can be populated with the generated keys by adding
keys-generated="true"
to the above example (this is not
the default because it is not supported by some database platforms). For
example:
<int-jdbc:outbound-gateway update="insert into foos (status, name) values (0, :payload[foo])" request-channel="input" reply-channel="output" data-source="dataSource" keys-generated="true"/>
Instead of the update count or the generated keys, you can also provide a select query to execute and generate a reply message from the result (like the inbound adapter), e.g:
<int-jdbc:outbound-gateway update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])" query="select * from foos where id=:headers[$id]" request-channel="input" reply-channel="output" data-source="dataSource"/>
As with the channel adapters, there is also the option to provide
SqlParameterSourceFactory
instances for request and
reply. The default is the same as for the outbound adapter, so the request
message is available as the root of an expression. If
keys-generated="true" then the root of the expression is the generated
keys (a map if there is only one or a list of maps if
multi-valued).
The outbound gateway requires a reference to either a DataSource or
a JdbcTemplate. It can also have a
SqlParameterSourceFactory
injected to control the
binding of the incoming message to the query.
The JDBC module provides an implementation of the Spring Integration
MessageStore
(important in the Claim Check pattern)
and MessageGroupStore
(important in stateful
patterns like Aggregator) backed by a database. Both interfaces are
implemented by the JdbcMessageStore, and there is also support for
configuring store instances in XML. For example:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
A JdbcTemplate
can be specified instead of a
DataSource
.
Other optional attributes are show in the next example:
<int-jdbc:message-store id="messageStore" data-source="dataSource" lob-handler="lobHandler" table-prefix="MY_INT_"/>
Here we have specified a LobHandler
for dealing with
messages as large objects (e.g. often necessary if using Oracle) and a
prefix for the table names in the queries generated by the store. The
table name prefix defaults to "INT_".
Spring Integration ships with some sample scripts that can be used
to initialize a database. In the spring-integration-jdbc JAR file you
will find scripts in the
org.springframework.integration.jdbc
package:
there is a create and a drop script example for a range of common
database platforms. A common way to use these scripts is to reference
them in a Spring
JDBC data source initializer. Note that the scripts are provided
as samples or specifications of the the required table and column names.
You may find that you need to enhance them for production use (e.g. with
index declarations).
It is common to use a JdbcMessageStore
as a
global store for a group of applications, or nodes in the same
application. To provide some protection against name clashes, and to
give control over the database meta-data configuration, the message
store allows the tables to be partitioned in two ways. One is to use
separate table names, by changing the prefix as described above, and the
other is to specify a "region" name for partitioning data within a
single table. An important use case for this is when the MessageStore is
managing persistent queues backing a Spring Integration Message Channel. The
message data for a persistent channel is keyed in the store on the
channel name, so if the channel names are not globally unique then there
is the danger of channels picking up data that was not intended for
them. To avoid this, the message store region can be used to keep data
separate for different physical channels that happen to have the same
logical name.
In certain situations plain JDBC support is not sufficient. Maybe you deal with legacy relational database schemas or you have complex data processing needs, but ultimately you have to use Stored Procedures or Stored Functions. Since Spring Integration 2.1, we provide three components in order to execute Stored Procedures or Stored Functions:
In order to enable calls to Stored Procedures
and Stored Functions, the Stored Procedure
components use the org.springframework.jdbc.core.simple.SimpleJdbcCall
class. Consequently, the following databases are fully supported
for executing Stored Procedures:
If you want to exute Stored Functions instead, the following databases are fully supported:
Note | |
---|---|
Even though your particular database may not be fully supported, chances are, that you can use the Stored Procedure Spring Integration components quite successfully anyway, provided your RDBMS supports Stored Procedures or Functions. As a matter of fact, some of the provided integration tests use the H2 database. Nevertheless, it is very important to thoroughly test those usage scenarios. |
The Stored Procedure components provide full XML Namespace support and configuring the components is similar as for the general purpose JDBC components discussed earlier.
Certain configuration parameters are shared among all Stored Procedure components and are described below:
auto-startup
Lifecycle attribute signaling if this component should
be started during Application Context startup.
Defaults to true
.
Optional.
data-source
Reference to a javax.sql.DataSource
,
which is used to access the database.
Required.
id
Identifies the underlying Spring bean definition, which
is an instance of either EventDrivenConsumer
or PollingConsumer
, depending
on whether the Outbound Channel Adapter's channel
attribute references a SubscribableChannel
or a PollableChannel
.
Optional.
ignore-column-meta-data
For fully supported databases, the underlying
SimpleJdbcCall
class can automatically retrieve the parameter information
for the to be invoked Stored Procedure or Function
from the JDBC Meta-data.
However, if the used database does not support meta
data lookups or if you like to provide customized parameter
definitions, this flag can be set to true
. It defaults
to false
.
Optional.
is-function
If true
, a SQL Function is called. In that case the
stored-procedure-name
attribute defines the name of
the called function. Defaults to false
.
Optional.
stored-procedure-name
The attribute specifies the name of the stored procedure. If the
is-function
attribute is set to true
,
this attribute specifies the function name.
Required.
sql-parameter-source-factory (Not available for the Stored Procedure Inbound Channel Adapter.)
Reference to a SqlParameterSourceFactory
.
By default bean properties of the passed in
Message
payload will be used
as a source for the Stored Procedure's input parameters
using a BeanPropertySqlParameterSourceFactory
.
This may be sufficient for basic use cases. For more
sophisticated options, consider passing in one or more
ProcedureParameter
. Please also refer to
Section 17.5.5, “Defining Parameter Sources”.
Optional.
use-payload-as-parameter-source (Not available for the Stored Procedure Inbound Channel Adapter.)
If set to true
, the payload of the Message
will be used as a source for providing parameters.
If false, however, the entire Message will be available as a
source for parameters.
If no Procedure Parameters are passed in, this property
will default to true
. This means that using a default
BeanPropertySqlParameterSourceFactory
the bean properties of the payload will be used as a
source for parameter values for the to-be-executed
Stored Procedure or Stored Function.
However, if Procedure Parameters are passed in, then
this property will by default evaluate to false
.
ProcedureParameter
allow for
SpEL Expressions to be provided and therefore it is
highly beneficial to have access to the entire Message. The property
is set on the underlying StoredProcExecutor
.
Optional.
The Stored Procedure components share a common set of sub-elements to define and pass parameters to Stored Procedures or Functions. The following elements are available:
parameter
Provides a mechanism to provide Stored Procedure parameters. Parameters can be either static or provided using a SpEL Expressions. Optional.
<int-jdbc:parameter name="" type="" value=""/> <int-jdbc:parameter name="" expression=""/>
The name of the parameter to be passed into the Stored Procedure or Stored Function. Required. | |
This attribute specifies the type of the value. If
nothing is provided this attribute will default to
| |
The value of the parameter. You have to provider either
this attribute or the | |
Instead of the |
returning-resultset
Stored Procedures may return multiple resultsets. By setting one
or more returning-resultset
elements, you can specify
RowMappers
in order to convert
each returned ResultSet
to meaningful objects.
Optional.
<int-jdbc:returning-resultset name="" row-mapper="" />
sql-parameter-definition
If you are using a database that is fully supported, you typically
don't have to specify the Stored Procedure parameter definitions.
Instead, those parameters can be automatically derived from the
JDBC Meta-data. However, if you are using databases that are not
fully supported, you must set those parameters explicitly using the
sql-parameter-definition
sub-element.
You can also choose to turn off any processing of parameter meta
data information obtained via JDBC using the ignore-column-meta-data
attribute.
<int-jdbc:sql-parameter-definition name="" direction="IN" type="STRING" scale=""/>
Specifies the name of the SQL parameter. Required. | |
Specifies the direction of the SQL parameter definition.
Defaults to | |
The SQL type used for this SQL parameter definition. Will translate into the integer value as defined by java.sql.Types. Alternatively you can provide the integer value as well. If this attribute is not explicitly set, then it will default to 'VARCHAR'. Optional. | |
The scale of the SQL parameter. Only used for numeric and decimal parameters. Optional. |
poller
Allows you to configure a Message Poller if this endpoint is a
PollingConsumer
.
Optional.
Parameter Sources govern the techniques of retrieving and mapping the Spring Integration Message properties to the relevant Stored Procedure input parameters. The Stored Procedure components follow certain rules.
By default bean properties of the passed in
Message
payload will be used as a
source for the Stored Procedure's input parameters. In that case a
BeanPropertySqlParameterSourceFactory
will
be used. This may be sufficient for basic use cases. The following
example illustrates that default behavior.
Important | |
---|---|
Please be aware that for the "automatic" lookup of bean properties
using the BeanPropertySqlParameterSourceFactory
to work, your bean properties must be defined in lower case.
This is due to the fact that in
org.springframework.jdbc.core.metadata.CallMetaDataContext
(method matchInParameterValuesWithCallParameters()), the retrieved
Stored Procedure parameter declarations are converted to lower case.
As a result, if you have camel-case bean properties such as "lastName",
the lookup will fail. In that case, please provide an explicit
ProcedureParameter .
|
Let's assume we have a payload that consists of a simple bean with the following three properties: id, name and description. Furthermore, we have a simplistic Stored Procedure called INSERT_COFFEE that accepts three input parameters: id, name and description. We also use a fully supported database. In that case the following configuration for a Stored Procedure Oubound Adapter will be sufficient:
<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource" channel="insertCoffeeProcedureRequestChannel" stored-procedure-name="INSERT_COFFEE"/>
For more sophisticated options consider passing in one or more
ProcedureParameter
.
If you do provide ProcedureParameter
explicitly,
then as default an ExpressionEvaluatingSqlParameterSourceFactory
will be used for parameter processing in order to enable the full
power of SpEL expressions.
Furthermore, if you need even more control over how parameters are
retrieved, consider passing in a custom implementation of a
SqlParameterSourceFactory
using the
sql-parameter-source-factory
attribute.
<int-jdbc:stored-proc-inbound-channel-adapter channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" is-function="false" max-rows-per-poll="" skip-undeclared-results="" <int:poller/> <int-jdbc:sql-parameter-definition name="" direction="IN" type="STRING" scale=""/> <int-jdbc:parameter name="" type="" value=""/> <int-jdbc:parameter name="" expression=""/> <int-jdbc:returning-resultset name="" row-mapper="" /> </int-jdbc:stored-proc-inbound-channel-adapter>
Channel to which polled messages will be sent. If the stored procedure or function does not return any data, the payload of the Message will be Null. Required. | |
Limits the number of rows extracted per query. Otherwise all rows are extracted into the outgoing message. Optional. | |
If this attribute is set to
E.g. Stored Procedures may return an update count value,
even though your Stored Procedure only declared a single
result parameter. The exact behavior depends on the used
database. The value is set on the underlying
Few developers will probably ever want to process
update counts, thus the value defaults to |
<int-jdbc:stored-proc-outbound-channel-adapter channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" order="" return-value-required="false" sql-parameter-source-factory="" use-payload-as-parameter-source=""> <int:poller fixed-rate=""/> <int-jdbc:sql-parameter-definition name=""/> <int-jdbc:parameter name=""/> </int-jdbc:stored-proc-outbound-channel-adapter>
The receiving Message Channel of this endpoint. Required. | |
Specifies the order for invocation when this endpoint is connected as a subscriber to a channel. This is particularly relevant when that channel is using a failover dispatching strategy. It has no effect when this endpoint itself is a Polling Consumer for a channel with a queue. Optional. | |
Indicates whether this procedure's return value should be included. Optional. |
<int-jdbc:stored-proc-outbound-gateway request-channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" is-function="false" order="" reply-channel="" reply-timeout="" return-value-required="false" skip-undeclared-results="" sql-parameter-source-factory="" use-payload-as-parameter-source=""> <int-jdbc:sql-parameter-definition name="" direction="IN" type="" scale="10"/> <int-jdbc:sql-parameter-definition name=""/> <int-jdbc:parameter name="" type="" value=""/> <int-jdbc:parameter name="" expression=""/> <int-jdbc:returning-resultset name="" row-mapper="" />
The receiving Message Channel of this endpoint. Required. | |
Message Channel to which replies should be sent, after receiving the database response. Optional. | |
Allows you to specify how long this gateway will wait
for the reply message to be sent successfully before
throwing an exception. Keep in mind that when sending
to a | |
Indicates whether this procedure's return value should be included. Optional. | |
If the
E.g. Stored Procedures may return an update count value,
even though your Stored Procedure only declared a single
result parameter. The exact behavior depends on the used
database. The value is set on the underlying
Few developers will probably ever want to process
update counts, thus the value defaults to |
In the following two examples we call Apache Derby
Stored Procedures. The first procedure will call a Stored Procedure that
returns a ResultSet
, and using a RowMapper
the data is converted into a domain object, which then becomes the
Spring Integration message payload.
In the second sample we call a Stored Procedure that uses Output Parameters instead, in order to return data.
Note | |
---|---|
Please have a look at the Spring Integration Samples project, located at https://github.com/SpringSource/spring-integration-samples The project contains the Apache Derby example referenced here, as well as instruction on how to run it. The Spring Integration Samples project also provides an example using Oracle Stored Procedures. |
In the first example, we call a Stored Procedure named
FIND_ALL_COFFEE_BEVERAGES that does not
define any input parameters but which returns a ResultSet
.
In Apache Derby, Stored Procedures are implemented using Java. Here is the method signature followed by the corresponding Sql:
public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages) throws SQLException { ... }
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \ PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \ EXTERNAL NAME 'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';
In Spring Integration, you can now call this Stored Procedure using
e.g. a stored-proc-outbound-gateway
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all" data-source="dataSource" request-channel="findAllProcedureRequestChannel" expect-single-result="true" stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES"> <int-jdbc:returning-resultset name="coffeeBeverages" row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/> </int-jdbc:stored-proc-outbound-gateway>
In the second example, we call a Stored Procedure named FIND_COFFEE that has one input parameter. Instead of returning a ResultSet, an output parameter is used:
public static void findCoffee(int coffeeId, String[] coffeeDescription) throws SQLException { ... }
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \ PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \ 'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';
In Spring Integration, you can now call this Stored Procedure using
e.g. a stored-proc-outbound-gateway
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee" data-source="dataSource" request-channel="findCoffeeProcedureRequestChannel" skip-undeclared-results="true" stored-procedure-name="FIND_COFFEE" expect-single-result="true"> <int-jdbc:parameter name="ID" expression="payload" /> </int-jdbc:stored-proc-outbound-gateway>