21. JDBC Support

Spring Integration provides Channel Adapters for receiving and sending messages via database queries.

21.1 Inbound Channel Adapter

The main function of an inbound Channel Adapter is to execute a SQL SELECT query and turn the result set into a message. The message payload is the whole result set, expressed as a List, and the types of the items in the list depends on the row-mapping strategy that is used. The default strategy is a generic mapper that just returns a Map for each row i nthe query. Optionally this can be changed by adding a reference to requires a reference to a RowMapper instance (see the Spring JDBC documentation for more detailed information about row mapping).

[Note]Note

If you want to convert rows in the SELECT query result to individual messages you can use a downstream splitter.

The inbound adapter also requires a reference to either JdbcTemplate instance or DataSource.

As well as the SELECT statement to generate the messages, the adapter above also has an UPDATE statement that is being used to mark the records as processed, so they don't show up in the next poll. The update can be parameterised by the list of ids from the original select. This is done through a naming convention by default (a column in the input result set called "id" is translated into a list in the parameter map for the update called "id"). The following example defines an inbound Channel Adapter with an update query and a DataSource reference.

<jdbc:inbound-channel-adapter query="select * from item where status=2"
  channel="target" data-source="dataSource"
  update="update item set status=10 where id in (:id)" />

[Note]Note
The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which in this case is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC combined with a convention (projection onto the polled result list) adopted in Spring Integration. The underlying Spring JDBC features limit the available expressions (e.g. most special characters other than period are disallowed), but since the target is usually a list of or an individual object addressable by simple bean paths this isn't unduly restrictive.

To change the parameter generation strategy you can inject a SqlParameterSourceFactory into the adapter to override the default behaviour (the adapter has a sql-parameter-source-factory attribute).

21.1.1 Polling and Transactions

The inbound adapter accepts a regular Spring Integration poller as a sub element, so for instance the frequency of the polling can be controlled. A very important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, for example:

<jdbc:inbound-channel-adapter query="..."
  channel="target" data-source="dataSource"
  update="...">
  <poller fixed-rate"1000">
    <transactional/>
  </poller>
</jdbc:inbound-channel-adapter>
[Note]Note
If a poller is not explicitly specified a default value will be used (and as per normal with Spring Integration can be defined as a top level bean)

In this example the database is polled every 1000 milliseconds, and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown, but as long as it is aware of the data source then the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread, and hence the same transaction. then if any of them fails, the transaction rolls back and the input data are reverted to their original state.

21.2 Outbound Channel Adapter

The outbound Channel Adapter is the inverse of the inbound: its role is to handle a message and use it to execute a SQL query. The message payload and headers are available by default as input parameters to the query, for instance:

<jdbc:outbound-channel-adapter
  query="insert into foos (id, status, name) values (:headers[$id], 0, :payload[foo])"
  channel="input" data-source="dataSource"/>

In the example above, messages arriving on the channel "input" have a payload of a map with key "foo", so the [] operator dereferences that value from the map. The headers are also accessed as a map.

[Note]Note
The parameters in the query above are bean property expressions on the incoming message (not Spring EL expressions). This behaviour is part of the SqlParameterSource which is the default source created by the outbound adapter. Other behaviour is possible in the adapter, and requires the user to inject a different SqlParameterSourceFactory .

The outbound adapter requires a reference to either a DataSource or a JdbcTemplate. It can also have a SqlParameterSourceFactory injected to control the binding of incoming message to the query.

If the input channel is a direct channel then the outbound adapter runs its query in the same thread, and therefor ethe same transaction (if there is one) as the sender of the message.

21.3 Outbound Gateway

The outbound Gateway is like a combination of the outbound and inbound adapters: its role is to handle a message and use it to execute a SQL query and then respond with the result sending it to a reply channel. The message payload and headers are available by default as input parameters to the query, for instance:

<jdbc:outbound-gateway
  update="insert into foos (id, status, name) values (:headers[$id], 0, :payload[foo])"
  request-channel="input" reply-channel="output" data-source="dataSource" />

The result of the above would be to insert a record into the "foos" table and return a message to the output channel indicating the number of rows affected (the payload is a map {UPDATED=1}.

If the update query is an insert with auto-generated keys, the reply message can be populated with the generated keys by adding keys-generated="true" to the above example (this is not the default because it is not supported by some database platforms). For example:

<jdbc:outbound-gateway
  update="insert into foos (status, name) values (0, :payload[foo])"
  request-channel="input" reply-channel="output" data-source="dataSource"
  keys-generated="true"/>

Instead of the update count or the generated keys, you can also provide a select query to execute and generate a reply message that way (like the inbound adapter), e.g:

<jdbc:outbound-gateway
  update="insert into foos (id, status, name) values (:headers[$id], 0, :payload[foo])"
  query="select * from foos where id=:headers[$id]"
  request-channel="input" reply-channel="output" data-source="dataSource" />

Like with the adapters there is also the option to provide SqlParameterSourceFactory instances for request and reply. The default is the same as for the outbound adapter, so the request message is available as the root of an expression. If keys-generated="true" then the root of the expression is the generated keys (a map if there is only one or a list of maps if multi-valued).

The outbound gateway requires a reference to either a DataSource or a JdbcTemplate. It can also have a SqlParameterSourceFactory injected to control the binding of incoming message to the query.

21.4 Message Store

The JDBC module provides an implementation of the Spring Integration MessageStore (important in the Claim Check pattern) and MessageGroupStore (important in stateful patterns like Aggregator) backed by a database. Both interfaces are implemented by the JdbcMessageStore and there is also support for configuring store instances in XML. For example:

<jdbc:message-store id="messageStore" data-source="dataSource"/>

A JdbcTemplate can be specified instead of a DataSource.

Other optional attributes are show in the next example:

<jdbc:message-store id="messageStore" data-source="dataSource"
  lob-handler="lobHandler" table-prefix="MY_INT_"/>

Here we have specified a LobHandler for dealing with messages as large objects (e.g. often necessary if using Oracle) and a prefix for the table names in the queries generated by the store. The table name prefix defaults to "INT_".

21.4.1 Initializing the Database

Spring Integration ships with some sample scripts that can be used to initialize a database. In the spring-integration-jdbc JAR file you will find scripts in the org.springframework.integration.jdbc package: there is a create and a drop script example for a range of common database platforms. A common way to use these scripts is to reference them in a Spring JDBC data source initializer. Note that the scripts are provided as samples or specifications of the the required table and column names. You may find that you need to enhance them for production use (e.g. with index declarations).

21.4.2 Partitioning a Message Store

It is common to use a JdbcMessageStore as a global store for a group of applications, or nodes in the same application. To provide some portection against name clashes, and to give control over the database meta-data configuration, the message store allows the tables to be partitioned in two ways. One is to use separate table names, by changing the prefix as described above, and the other is to specify a "region" name for partitioning data within a single table. An important use case for this is using the store to manage persistent queues backing a Spring Integration channel. The message data for a persistent channel is keyed in the store on the channel name, so if the channel names are not globally unique then there is the danger of channels picking up data that was not intended for them. To avoid this the message store region can be used to keep data separate for different physical channels that happen to have the same logical name.