Spring Integration provides Channel Adapters for receiving and sending messages via database queries.
The main function of an inbound Channel Adapter is to execute a SQL
SELECT
query and turn the result set into a message. The
message payload is the whole result set, expressed as a
List
, and the types of the items in the list
depends on the row-mapping strategy that is used. The default strategy is
a generic mapper that just returns a Map
for each
row i nthe query. Optionally this can be changed by adding a reference to
requires a reference to a RowMapper
instance (see
the Spring
JDBC documentation for more detailed information about row
mapping).
Note | |
---|---|
If you want to convert rows in the SELECT query result to individual messages you can use a downstream splitter. |
The inbound adapter also requires a reference to either
JdbcTemplate
instance or
DataSource
.
As well as the SELECT
statement to generate the
messages, the adapter above also has an UPDATE
statement that
is being used to mark the records as processed, so they don't show up in
the next poll. The update can be parameterised by the list of ids from the
original select. This is done through a naming convention by default (a
column in the input result set called "id" is translated into a list in
the parameter map for the update called "id"). The following example
defines an inbound Channel Adapter with an update query and a
DataSource
reference.
<jdbc:inbound-channel-adapter query="select * from item where status=2" channel="target" data-source="dataSource" update="update item set status=10 where id in (:id)" />
Note | |
---|---|
The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which in this case is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC combined with a convention (projection onto the polled result list) adopted in Spring Integration. The underlying Spring JDBC features limit the available expressions (e.g. most special characters other than period are disallowed), but since the target is usually a list of or an individual object addressable by simple bean paths this isn't unduly restrictive. |
To change the parameter generation strategy you can inject a
SqlParameterSourceFactory
into the adapter to
override the default behaviour (the adapter has a
sql-parameter-source-factory
attribute).
The inbound adapter accepts a regular Spring Integration poller as a sub element, so for instance the frequency of the polling can be controlled. A very important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, for example:
<jdbc:inbound-channel-adapter query="..." channel="target" data-source="dataSource" update="..."> <poller fixed-rate"1000"> <transactional/> </poller> </jdbc:inbound-channel-adapter>
Note | |
---|---|
If a poller is not explicitly specified a default value will be used (and as per normal with Spring Integration can be defined as a top level bean) |
In this example the database is polled every 1000 milliseconds, and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown, but as long as it is aware of the data source then the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread, and hence the same transaction. then if any of them fails, the transaction rolls back and the input data are reverted to their original state.
The outbound Channel Adapter is the inverse of the inbound: its role is to handle a message and use it to execute a SQL query. The message payload and headers are available by default as input parameters to the query, for instance:
<jdbc:outbound-channel-adapter query="insert into foos (id, status, name) values (:headers[$id], 0, :payload[foo])" channel="input" data-source="dataSource"/>
In the
example above, messages arriving on the channel "input" have a payload of
a map with key "foo", so the []
operator dereferences that
value from the map. The headers are also accessed as a map.
Note | |
---|---|
The parameters in the query above are bean property expressions on the incoming message (not Spring EL expressions). This behaviour is part of the
SqlParameterSource
which is the default source created by the outbound adapter. Other behaviour is possible in the adapter, and requires the user to inject a different
SqlParameterSourceFactory
.
|
The outbound adapter requires a reference to either a DataSource or
a JdbcTemplate. It can also have a
SqlParameterSourceFactory
injected to control the
binding of incoming message to the query.
If the input channel is a direct channel then the outbound adapter runs its query in the same thread, and therefor ethe same transaction (if there is one) as the sender of the message.
The outbound Gateway is like a combination of the outbound and inbound adapters: its role is to handle a message and use it to execute a SQL query and then respond with the result sending it to a reply channel. The message payload and headers are available by default as input parameters to the query, for instance:
<jdbc:outbound-gateway update="insert into foos (id, status, name) values (:headers[$id], 0, :payload[foo])" request-channel="input" reply-channel="output" data-source="dataSource" />
The result of the above would be to insert a record into the "foos"
table and return a message to the output channel indicating the number of
rows affected (the payload is a map {UPDATED=1}
.
If the update query is an insert with auto-generated keys, the reply
message can be populated with the generated keys by adding
keys-generated="true"
to the above example (this is not
the default because it is not supported by some database platforms). For
example:
<jdbc:outbound-gateway update="insert into foos (status, name) values (0, :payload[foo])" request-channel="input" reply-channel="output" data-source="dataSource" keys-generated="true"/>
Instead of the update count or the generated keys, you can also provide a select query to execute and generate a reply message that way (like the inbound adapter), e.g:
<jdbc:outbound-gateway update="insert into foos (id, status, name) values (:headers[$id], 0, :payload[foo])" query="select * from foos where id=:headers[$id]" request-channel="input" reply-channel="output" data-source="dataSource" />
Like with the adapters there is also the option to provide
SqlParameterSourceFactory
instances for request and
reply. The default is the same as for the outbound adapter, so the request
message is available as the root of an expression. If
keys-generated="true" then the root of the expression is the generated
keys (a map if there is only one or a list of maps if
multi-valued).
The outbound gateway requires a reference to either a DataSource or
a JdbcTemplate. It can also have a
SqlParameterSourceFactory
injected to control the
binding of incoming message to the query.
The JDBC module provides an implementation of the Spring Integration
MessageStore
(important in the Claim Check pattern)
and MessageGroupStore
(important in stateful
patterns like Aggregator) backed by a database. Both interfaces are
implemented by the JdbcMessageStore and there is also support for
configuring store instances in XML. For example:
<jdbc:message-store id="messageStore" data-source="dataSource"/>
A JdbcTemplate
can be specified instead of a
DataSource
.
Other optional attributes are show in the next example:
<jdbc:message-store id="messageStore" data-source="dataSource" lob-handler="lobHandler" table-prefix="MY_INT_"/>
Here we
have specified a LobHandler
for dealing with
messages as large objects (e.g. often necessary if using Oracle) and a
prefix for the table names in the queries generated by the store. The
table name prefix defaults to "INT_".
Spring Integration ships with some sample scripts that can be used
to initialize a database. In the spring-integration-jdbc JAR file you
will find scripts in the
org.springframework.integration.jdbc
package:
there is a create and a drop script example for a range of common
database platforms. A common way to use these scripts is to reference
them in a Spring
JDBC data source initializer. Note that the scripts are provided
as samples or specifications of the the required table and column names.
You may find that you need to enhance them for production use (e.g. with
index declarations).
It is common to use a JdbcMessageStore
as a
global store for a group of applications, or nodes in the same
application. To provide some portection against name clashes, and to
give control over the database meta-data configuration, the message
store allows the tables to be partitioned in two ways. One is to use
separate table names, by changing the prefix as described above, and the
other is to specify a "region" name for partitioning data within a
single table. An important use case for this is using the store to
manage persistent queues backing a Spring Integration channel. The
message data for a persistent channel is keyed in the store on the
channel name, so if the channel names are not globally unique then there
is the danger of channels picking up data that was not intended for
them. To avoid this the message store region can be used to keep data
separate for different physical channels that happen to have the same
logical name.