This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.4.1! |
JDBC Message Store
Spring Integration provides two JDBC specific message store implementations.
The JdbcMessageStore
is suitable for use with aggregators and the claim check pattern.
The JdbcChannelMessageStore
implementation provides a more targeted and scalable implementation specifically for message channel.
Note that you can use a JdbcMessageStore
to back a message channel, JdbcChannelMessageStore
is optimized for that purpose.
Starting with versions 5.0.11, 5.1.2, the indexes for the JdbcChannelMessageStore have been optimized.
If you have large message groups in such a store, you may wish to alter the indexes.
Furthermore, the index for PriorityChannel is commented out because it is not needed unless you are using such channels backed by JDBC.
|
When using the OracleChannelMessageStoreQueryProvider , the priority channel index must be added because it is included in a hint in the query.
|
Initializing the Database
Before starting to use JDBC message store components, you should provision a target database with the appropriate objects.
Spring Integration ships with some sample scripts that can be used to initialize a database.
In the spring-integration-jdbc
JAR file, you can find scripts in the org.springframework.integration.jdbc
package.
It provides an example create and an example drop script for a range of common database platforms.
A common way to use these scripts is to reference them in a Spring JDBC data source initializer.
Note that the scripts are provided as samples and as specifications of the required table and column names.
You may find that you need to enhance them for production use (for, example, by adding index declarations).
Starting with version 6.2, the JdbcMessageStore
, JdbcChannelMessageStore
, JdbcMetadataStore
, and DefaultLockRepository
implement SmartLifecycle
and perform a`SELECT COUNT` query, on their respective tables, in the start()
method to ensure that the required table (according to the provided prefix) is present in the target database.
If the required table does not exist, the application context fails to start.
The check can be disabled via setCheckDatabaseOnStart(false)
.
The Generic JDBC Message Store
The JDBC module provides an implementation of the Spring Integration MessageStore
(important in the claim check pattern) and MessageGroupStore
(important in stateful patterns such as an aggregator) backed by a database.
Both interfaces are implemented by the JdbcMessageStore
, and there is support for configuring store instances in XML, as the following example shows:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
You can specify a JdbcTemplate
instead of a DataSource
.
The following example shows some other optional attributes:
<int-jdbc:message-store id="messageStore" data-source="dataSource"
lob-handler="lobHandler" table-prefix="MY_INT_"/>
In the preceding example, we have specified a LobHandler
for dealing with messages as large objects (which is often necessary for Oracle) and a prefix for the table names in the queries generated by the store.
The table name prefix defaults to INT_
.
Backing Message Channels
If you intend to back message channels with JDBC, we recommend using the JdbcChannelMessageStore
implementation.
It works only in conjunction with Message Channels.
Supported Databases
The JdbcChannelMessageStore
uses database-specific SQL queries to retrieve messages from the database.
Therefore, you must set the ChannelMessageStoreQueryProvider
property on the JdbcChannelMessageStore
.
This channelMessageStoreQueryProvider
provides the SQL queries for the particular database you specify.
Spring Integration provides support for the following relational databases:
-
PostgreSQL
-
HSQLDB
-
MySQL
-
Oracle
-
Derby
-
H2
-
SqlServer
-
Sybase
-
DB2
If your database is not listed, you can implement the ChannelMessageStoreQueryProvider
interface and provide your own custom queries.
Version 4.0 added the MESSAGE_SEQUENCE
column to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.
Starting with version 6.2, ChannelMessageStoreQueryProvider
exposes a isSingleStatementForPoll
flag, where the PostgresChannelMessageStoreQueryProvider
returns true
and its queries for polls are now based on a single DELETE…RETURNING
statement.
The JdbcChannelMessageStore
consults with the isSingleStatementForPoll
option and skips a separate DELETE
statement if only single poll statement is supported.
Custom Message Insertion
Since version 5.0, by overloading the ChannelMessageStorePreparedStatementSetter
class, you can provide a custom implementation for message insertion in the JdbcChannelMessageStore
.
You can use it to set different columns or change the table structure or serialization strategy.
For example, instead of default serialization to byte[]
, you can store its structure as a JSON string.
The following example uses the default implementation of setValues
to store common columns and overrides the behavior to store the message payload as a varchar
:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
Generally, we do not recommend using a relational database for queuing. Instead, if possible, consider using either JMS- or AMQP-backed channels instead. For further reference, see the following resource: If you are still planning to use your database as a queue, consider using PostgreSQL and its notification mechanism which is described in a subsequent section. |
Concurrent Polling
When polling a message channel, you have the option to configure the associated Poller
with a TaskExecutor
reference.
Keep in mind, though, that if you use a JDBC backed message channel, and you plan to poll the channel and consequently the message store transactional with multiple threads, you should ensure that you use a relational database that supports Multiversion Concurrency Control (MVCC). Otherwise, locking may be an issue and the performance, when using multiple threads, may not materialize as expected. For example, Apache Derby is problematic in that regard. To achieve better JDBC queue throughput and avoid issues when different threads may poll the same
|
Priority Channel
Starting with version 4.0, JdbcChannelMessageStore
implements PriorityCapableChannelMessageStore
and provides the priorityEnabled
option, letting it be used as a message-store
reference for priority-queue
instances.
For this purpose, the INT_CHANNEL_MESSAGE
table has a MESSAGE_PRIORITY
column to store the value of PRIORITY
message headers.
In addition, a new MESSAGE_SEQUENCE
column lets us achieve a robust first-in-first-out (FIFO) polling mechanism, even when multiple messages are stored with the same priority in the same millisecond.
Messages are polled (selected) from the database with order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
.
We do not recommend using the same JdbcChannelMessageStore bean for priority and non-priority queue channels, because the priorityEnabled option applies to the entire store and proper FIFO queue semantics are not retained for the queue channel.
However, the same INT_CHANNEL_MESSAGE table (and even region ) can be used for both JdbcChannelMessageStore types.
To configure that scenario, you can extend one message store bean from the other, as the following example shows:
|
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
Partitioning a Message Store
It is common to use a JdbcMessageStore
as a global store for a group of applications or nodes in the same application.
To provide some protection against name clashes and to give control over the database meta-data configuration, the message store lets the tables be partitioned in two ways.
One way is to use separate table names, by changing the prefix (as described earlier).
The other way is to specify a region
name for partitioning data within a single table.
An important use case for the second approach is when the MessageStore
is managing persistent queues that back a Spring Integration Message Channel.
The message data for a persistent channel is keyed in the store on the channel name.
Consequently, if the channel names are not globally unique, the channels can pick up data that is not intended for them.
To avoid this danger, you can use the message store region
to keep data separate for different physical channels that have the same logical name.
PostgreSQL: Receiving Push Notifications
PostgreSQL offers a listen and notification framework for receiving push notifications upon database table manipulations.
Spring Integration leverages this mechanism (starting with version 6.0) to allow for receiving push notifications when new messages are added to a JdbcChannelMessageStore
.
When using this feature, a database trigger must be defined, which can be found as part of the comments of the schema-postgresql.sql
file which is included in the JDBC module of Spring Integration.
Push notifications are received via the PostgresChannelMessageTableSubscriber
class which allows its subscribers to receive a callback upon the arrival of new messages for any given region
and groupId
.
These notifications are received even if a message was appended on a different JVM, but to the same database.
The PostgresSubscribableChannel
implementation uses a PostgresChannelMessageTableSubscriber.Subscription
contract to pull messages from the store as a reaction for notification from the mentioned PostgresChannelMessageTableSubscriber
notifications.
For example, push notifications for some group
can be received as follows:
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
Transaction support
Starting with version 6.0.5, specifying a PlatformTransactionManager
on a PostgresSubscribableChannel
will notify subscribers in a transaction.
An exception in a subscriber will cause the transaction to be rolled back and the message to be put back in the message store.
Transactional support is not activated by default.
Retries
Starting with version 6.0.5, a retry policy can be specified by providing a RetryTemplate
to the PostgresSubscribableChannel
.
By default, no retries are performed.
Any active For this need of an exclusive connection, it is also recommended that a JVM only runs a single |