JDBC Message Store

Spring Integration provides two JDBC specific message store implementations. The JdbcMessageStore is suitable for use with aggregators and the claim check pattern. The JdbcChannelMessageStore implementation provides a more targeted and scalable implementation specifically for message channel.

Note that you can use a JdbcMessageStore to back a message channel, JdbcChannelMessageStore is optimized for that purpose.

Starting with versions 5.0.11, 5.1.2, the indexes for the JdbcChannelMessageStore have been optimized. If you have large message groups in such a store, you may wish to alter the indexes. Furthermore, the index for PriorityChannel is commented out because it is not needed unless you are using such channels backed by JDBC.
When using the OracleChannelMessageStoreQueryProvider, the priority channel index must be added because it is included in a hint in the query.

Initializing the Database

Before starting to use JDBC message store components, you should provision a target database with the appropriate objects.

Spring Integration ships with some sample scripts that can be used to initialize a database. In the spring-integration-jdbc JAR file, you can find scripts in the org.springframework.integration.jdbc package. It provides an example create and an example drop script for a range of common database platforms. A common way to use these scripts is to reference them in a Spring JDBC data source initializer. Note that the scripts are provided as samples and as specifications of the required table and column names. You may find that you need to enhance them for production use (for, example, by adding index declarations).

Starting with version 6.2, the JdbcMessageStore, JdbcChannelMessageStore, JdbcMetadataStore, and DefaultLockRepository implement SmartLifecycle and perform a`SELECT COUNT` query, on their respective tables, in the start() method to ensure that the required table (according to the provided prefix) is present in the target database. If the required table does not exist, the application context fails to start. The check can be disabled via setCheckDatabaseOnStart(false).

The Generic JDBC Message Store

The JDBC module provides an implementation of the Spring Integration MessageStore (important in the claim check pattern) and MessageGroupStore (important in stateful patterns such as an aggregator) backed by a database. Both interfaces are implemented by the JdbcMessageStore, and there is support for configuring store instances in XML, as the following example shows:

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

You can specify a JdbcTemplate instead of a DataSource.

The following example shows some other optional attributes:

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

In the preceding example, we have specified a LobHandler for dealing with messages as large objects (which is often necessary for Oracle) and a prefix for the table names in the queries generated by the store. The table name prefix defaults to INT_.

Backing Message Channels

If you intend to back message channels with JDBC, we recommend using the JdbcChannelMessageStore implementation. It works only in conjunction with Message Channels.

Supported Databases

The JdbcChannelMessageStore uses database-specific SQL queries to retrieve messages from the database. Therefore, you must set the ChannelMessageStoreQueryProvider property on the JdbcChannelMessageStore. This channelMessageStoreQueryProvider provides the SQL queries for the particular database you specify. Spring Integration provides support for the following relational databases:

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • Derby

  • H2

  • SqlServer

  • Sybase

  • DB2

If your database is not listed, you can implement the ChannelMessageStoreQueryProvider interface and provide your own custom queries.

Version 4.0 added the MESSAGE_SEQUENCE column to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.

Starting with version 6.2, ChannelMessageStoreQueryProvider exposes a isSingleStatementForPoll flag, where the PostgresChannelMessageStoreQueryProvider returns true and its queries for polls are now based on a single DELETE…​RETURNING statement. The JdbcChannelMessageStore consults with the isSingleStatementForPoll option and skips a separate DELETE statement if only single poll statement is supported.

Custom Message Insertion

Since version 5.0, by overloading the ChannelMessageStorePreparedStatementSetter class, you can provide a custom implementation for message insertion in the JdbcChannelMessageStore. You can use it to set different columns or change the table structure or serialization strategy. For example, instead of default serialization to byte[], you can store its structure as a JSON string.

The following example uses the default implementation of setValues to store common columns and overrides the behavior to store the message payload as a varchar:

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

Generally, we do not recommend using a relational database for queuing. Instead, if possible, consider using either JMS- or AMQP-backed channels instead. For further reference, see the following resource:

If you are still planning to use your database as a queue, consider using PostgreSQL and its notification mechanism which is described in a subsequent section.

Concurrent Polling

When polling a message channel, you have the option to configure the associated Poller with a TaskExecutor reference.

Keep in mind, though, that if you use a JDBC backed message channel, and you plan to poll the channel and consequently the message store transactional with multiple threads, you should ensure that you use a relational database that supports Multiversion Concurrency Control (MVCC). Otherwise, locking may be an issue and the performance, when using multiple threads, may not materialize as expected. For example, Apache Derby is problematic in that regard.

To achieve better JDBC queue throughput and avoid issues when different threads may poll the same Message from the queue, it is important to set the usingIdCache property of JdbcChannelMessageStore to true when using databases that do not support MVCC. The following example shows how to do so:

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

Priority Channel

Starting with version 4.0, JdbcChannelMessageStore implements PriorityCapableChannelMessageStore and provides the priorityEnabled option, letting it be used as a message-store reference for priority-queue instances. For this purpose, the INT_CHANNEL_MESSAGE table has a MESSAGE_PRIORITY column to store the value of PRIORITY message headers. In addition, a new MESSAGE_SEQUENCE column lets us achieve a robust first-in-first-out (FIFO) polling mechanism, even when multiple messages are stored with the same priority in the same millisecond. Messages are polled (selected) from the database with order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE.

We do not recommend using the same JdbcChannelMessageStore bean for priority and non-priority queue channels, because the priorityEnabled option applies to the entire store and proper FIFO queue semantics are not retained for the queue channel. However, the same INT_CHANNEL_MESSAGE table (and even region) can be used for both JdbcChannelMessageStore types. To configure that scenario, you can extend one message store bean from the other, as the following example shows:
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

Partitioning a Message Store

It is common to use a JdbcMessageStore as a global store for a group of applications or nodes in the same application. To provide some protection against name clashes and to give control over the database meta-data configuration, the message store lets the tables be partitioned in two ways. One way is to use separate table names, by changing the prefix (as described earlier). The other way is to specify a region name for partitioning data within a single table. An important use case for the second approach is when the MessageStore is managing persistent queues that back a Spring Integration Message Channel. The message data for a persistent channel is keyed in the store on the channel name. Consequently, if the channel names are not globally unique, the channels can pick up data that is not intended for them. To avoid this danger, you can use the message store region to keep data separate for different physical channels that have the same logical name.

PostgreSQL: Receiving Push Notifications

PostgreSQL offers a listen and notification framework for receiving push notifications upon database table manipulations. Spring Integration leverages this mechanism (starting with version 6.0) to allow for receiving push notifications when new messages are added to a JdbcChannelMessageStore. When using this feature, a database trigger must be defined, which can be found as part of the comments of the schema-postgresql.sql file which is included in the JDBC module of Spring Integration.

Push notifications are received via the PostgresChannelMessageTableSubscriber class which allows its subscribers to receive a callback upon the arrival of new messages for any given region and groupId. These notifications are received even if a message was appended on a different JVM, but to the same database. The PostgresSubscribableChannel implementation uses a PostgresChannelMessageTableSubscriber.Subscription contract to pull messages from the store as a reaction for notification from the mentioned PostgresChannelMessageTableSubscriber notifications.

For example, push notifications for some group can be received as follows:

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

Transaction support

Starting with version 6.0.5, specifying a PlatformTransactionManager on a PostgresSubscribableChannel will notify subscribers in a transaction. An exception in a subscriber will cause the transaction to be rolled back and the message to be put back in the message store. Transactional support is not activated by default.

Retries

Starting with version 6.0.5, a retry policy can be specified by providing a RetryTemplate to the PostgresSubscribableChannel. By default, no retries are performed.

Any active PostgresChannelMessageTableSubscriber occupies an exclusive JDBC Connection for the duration of its active life cycle. It is therefore important that this connection does not originate from a pooling DataSource. Such connection pools do normally expect that issued connections are closed within a predefined timeout window.

For this need of an exclusive connection, it is also recommended that a JVM only runs a single PostgresChannelMessageTableSubscriber which can be used to register any number of subscriptions.