Spring Integration

org.springframework.integration.jdbc.store
Class JdbcChannelMessageStore

java.lang.Object
  extended by org.springframework.integration.store.AbstractMessageGroupStore
      extended by org.springframework.integration.jdbc.store.JdbcChannelMessageStore
All Implemented Interfaces:
java.lang.Iterable<MessageGroup>, org.springframework.beans.factory.InitializingBean, MessageGroupStore

@ManagedResource
public class JdbcChannelMessageStore
extends AbstractMessageGroupStore
implements org.springframework.beans.factory.InitializingBean

Channel-specific implementation of MessageGroupStore using a relational database via JDBC. This message store shall be used for message channels only.

NOTICE: This implementation may change for Spring Integration 3.0. It is provided for use-cases where the current JdbcMessageStore is not delivering the desired performance characteristics.

As such, the JdbcChannelMessageStore uses database specific SQL queries.

Contrary to the JdbcMessageStore, this implementation uses one single database table only. The SQL scripts to create the necessary table are packaged under org/springframework/integration/jdbc/messagestore/channel/schema-*.sql, where * denotes the target database type.

Since:
2.2
Author:
Gunnar Hillert

Field Summary
static java.lang.String CREATED_DATE_KEY
          The name of the message header that stores a timestamp for the time the message was inserted.
static int DEFAULT_LONG_STRING_LENGTH
           
static java.lang.String DEFAULT_REGION
          Default region property, used to partition the message store.
static java.lang.String DEFAULT_TABLE_PREFIX
          Default value for the table prefix property.
static java.lang.String SAVED_KEY
          The name of the message header that stores a flag to indicate that the message has been saved.
 
Constructor Summary
JdbcChannelMessageStore()
          Convenient constructor for configuration use.
JdbcChannelMessageStore(javax.sql.DataSource dataSource)
          Create a MessageStore with all mandatory properties.
 
Method Summary
 MessageGroup addMessageToGroup(java.lang.Object groupId, Message<?> message)
          Store a message in the database.
 void afterPropertiesSet()
          Check mandatory properties (DataSource and setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)).
 void completeGroup(java.lang.Object groupId)
          Method not implemented.
protected  Message<?> doPollForMessage(java.lang.String groupIdKey)
          This method executes a call to the DB to get the oldest Message in the MessageGroup which in the context of the JdbcChannelMessageStore means the channel identifier.
 long getMessageCount()
          Method not implemented.
 int getMessageCountForAllMessageGroups()
          Method not implemented.
 MessageGroup getMessageGroup(java.lang.Object groupId)
          Not fully used.
 int getMessageGroupCount()
          Method not implemented.
protected  java.lang.String getQuery(java.lang.String sqlQuery)
          Replace patterns in the input to produce a valid SQL query.
 int getSizeOfIdCache()
          Returns the size of the Message Id Cache, which caches Message Ids for those messages that are currently being processed.
 java.util.Iterator<MessageGroup> iterator()
          Method not implemented.
 int messageGroupSize(java.lang.Object groupId)
          Returns the number of messages persisted for the specified channel id (groupId) and the specified region (setRegion(String)).
 Message<?> pollMessageFromGroup(java.lang.Object groupId)
          Polls the database for a new message that is persisted for the given group id which represents the channel identifier.
 void removeFromIdCache(java.lang.String messageId)
          Remove a Message Id from the idCache.
 MessageGroup removeMessageFromGroup(java.lang.Object groupId, Message<?> messageToRemove)
          Remove a single message from the database.
 void removeMessageGroup(java.lang.Object groupId)
          Will remove all messages from the message channel.
 void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider)
           Sets the database specific ChannelMessageStoreQueryProvider to use.
 void setDataSource(javax.sql.DataSource dataSource)
          The JDBC DataSource to use when interacting with the database.
 void setDeserializer(org.springframework.core.serializer.Deserializer<? extends Message<?>> deserializer)
          A converter for deserializing byte arrays to messages.
 void setJdbcTemplate(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate)
          The JdbcOperations to use when interacting with the database.
 void setLastReleasedSequenceNumberForGroup(java.lang.Object groupId, int sequenceNumber)
          Method not implemented.
 void setLobHandler(org.springframework.jdbc.support.lob.LobHandler lobHandler)
          Override the LobHandler that is used to create and unpack large objects in SQL queries.
 void setMessageRowMapper(MessageRowMapper messageRowMapper)
          Allows for passing in a custom MessageRowMapper.
 void setRegion(java.lang.String region)
          A unique grouping identifier for all messages persisted with this store.
 void setSerializer(org.springframework.core.serializer.Serializer<? super Message<?>> serializer)
          A converter for serializing messages to byte arrays for storage.
 void setTablePrefix(java.lang.String tablePrefix)
          Public setter for the table prefix property.
 void setUsingIdCache(boolean usingIdCache)
          Consider using this property when polling the database transactionally using multiple parallel threads, meaning when the configured poller is configured using a task executor.
 
Methods inherited from class org.springframework.integration.store.AbstractMessageGroupStore
expireMessageGroups, isTimeoutOnIdle, registerMessageGroupExpiryCallback, setExpiryCallbacks, setTimeoutOnIdle
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_TABLE_PREFIX

public static final java.lang.String DEFAULT_TABLE_PREFIX
Default value for the table prefix property.

See Also:
Constant Field Values

DEFAULT_REGION

public static final java.lang.String DEFAULT_REGION
Default region property, used to partition the message store. For example, a separate Spring Integration application with overlapping channel names may use the same message store by providing a distinct region name.

See Also:
Constant Field Values

DEFAULT_LONG_STRING_LENGTH

public static final int DEFAULT_LONG_STRING_LENGTH
See Also:
Constant Field Values

SAVED_KEY

public static final java.lang.String SAVED_KEY
The name of the message header that stores a flag to indicate that the message has been saved. This is an optimization for the put method.


CREATED_DATE_KEY

public static final java.lang.String CREATED_DATE_KEY
The name of the message header that stores a timestamp for the time the message was inserted.

Constructor Detail

JdbcChannelMessageStore

public JdbcChannelMessageStore()
Convenient constructor for configuration use.


JdbcChannelMessageStore

public JdbcChannelMessageStore(javax.sql.DataSource dataSource)
Create a MessageStore with all mandatory properties. The passed-in DataSource is used to instantiate a JdbcTemplate with JdbcTemplate.setFetchSize(int) set to 1 and with JdbcTemplate.setMaxRows(int) set to 1.

Parameters:
dataSource - a DataSource
Method Detail

setDataSource

public void setDataSource(javax.sql.DataSource dataSource)
The JDBC DataSource to use when interacting with the database. The passed-in DataSource is used to instantiate a JdbcTemplate with JdbcTemplate.setFetchSize(int) set to 1 and with JdbcTemplate.setMaxRows(int) set to 1.

Parameters:
dataSource - a DataSource

setDeserializer

public void setDeserializer(org.springframework.core.serializer.Deserializer<? extends Message<?>> deserializer)
A converter for deserializing byte arrays to messages.

Parameters:
deserializer - the deserializer to set

setJdbcTemplate

public void setJdbcTemplate(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate)
The JdbcOperations to use when interacting with the database. Either this property can be set or the dataSource. Please consider passing in a JdbcTemplate with a fetchSize property of 1. This is particularly important for Oracle to ensure First In, First Out (FIFO) message retrieval characteristics.

Parameters:
jdbcTemplate - a JdbcOperations

setLastReleasedSequenceNumberForGroup

public void setLastReleasedSequenceNumberForGroup(java.lang.Object groupId,
                                                  int sequenceNumber)
Method not implemented.

Specified by:
setLastReleasedSequenceNumberForGroup in interface MessageGroupStore
Throws:
java.lang.UnsupportedOperationException

setLobHandler

public void setLobHandler(org.springframework.jdbc.support.lob.LobHandler lobHandler)
Override the LobHandler that is used to create and unpack large objects in SQL queries. The default is fine for almost all platforms, but some Oracle drivers require a native implementation.

Parameters:
lobHandler - a LobHandler

setMessageRowMapper

public void setMessageRowMapper(MessageRowMapper messageRowMapper)
Allows for passing in a custom MessageRowMapper. The MessageRowMapper is used to convert the selected database row representing the persisted message into the actual Message object.

Parameters:
messageRowMapper - Must not be null

setChannelMessageStoreQueryProvider

public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider)

Sets the database specific ChannelMessageStoreQueryProvider to use. The JdbcChannelMessageStore provides the SQL queries to retrieve messages from the database. The following ChannelMessageStoreQueryProvider are provided:

Beyond, you can provide your own query implementations, in case you need to support additional databases and/or need to fine-tune the queries for your requirements.

Parameters:
channelMessageStoreQueryProvider - Must not be null.

setRegion

public void setRegion(java.lang.String region)
A unique grouping identifier for all messages persisted with this store. Using multiple regions allows the store to be partitioned (if necessary) for different purposes. Defaults to DEFAULT_REGION.

Parameters:
region - the region name to set

setSerializer

public void setSerializer(org.springframework.core.serializer.Serializer<? super Message<?>> serializer)
A converter for serializing messages to byte arrays for storage.

Parameters:
serializer - The serializer to set

setTablePrefix

public void setTablePrefix(java.lang.String tablePrefix)
Public setter for the table prefix property. This will be prefixed to all the table names before queries are executed. Defaults to DEFAULT_TABLE_PREFIX.

Parameters:
tablePrefix - the tablePrefix to set

setUsingIdCache

public void setUsingIdCache(boolean usingIdCache)

Consider using this property when polling the database transactionally using multiple parallel threads, meaning when the configured poller is configured using a task executor.

The issue is that the pollMessageFromGroup(Object) looks for the oldest entry for a giving channel (groupKey) and region (setRegion(String)). If you do that with multiple threads and you are using transactions, other threads may be waiting for that same locked row.

If using the provided OracleChannelMessageStoreQueryProvider, don't set usingIdCache to true, as the Oracle query will ignore locked rows.

Using the id cache, the JdbcChannelMessageStore will store each message id in an in-memory collection for the duration of processing. With that, any polling threads will explicitly exclude those messages from being polled.

For this to work, you must setup the corresponding TransactionSynchronizationFactory:

 <int:transaction-synchronization-factory id="syncFactory">
     <int:after-commit   expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" />
     <int:after-rollback expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" />
 </int:transaction-synchronization-factory>
 
 
This TransactionSynchronizationFactory is then referenced in the transaction configuration of the poller:
 <int:poller fixed-delay="300" receive-timeout="500"
     max-messages-per-poll="1" task-executor="pool">
     <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
         isolation="READ_COMMITTED" transaction-manager="transactionManager" />
 </int:poller>
 
 

Parameters:
usingIdCache - When true the id cache will be used.

afterPropertiesSet

public void afterPropertiesSet()
                        throws java.lang.Exception
Check mandatory properties (DataSource and setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)). If no MessageRowMapper was explicitly set using setMessageRowMapper(MessageRowMapper), the default MessageRowMapper will be instantiate using the specified deserializer and lobHandler. Also, if the jdbcTemplate's fetchSize property (JdbcTemplate.getFetchSize()) is not 1, a warning will be logged. When using the JdbcChannelMessageStore with Oracle, the fetchSize value of 1 is needed to ensure FIFO characteristics of polled messages. Please see the Oracle ChannelMessageStoreQueryProvider for more details.

Specified by:
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean
Throws:
java.lang.Exception

addMessageToGroup

public MessageGroup addMessageToGroup(java.lang.Object groupId,
                                      Message<?> message)
Store a message in the database. The groupId identifies the channel for which the message is to be stored. Keep in mind that the actual groupdId (Channel Identifier) is converted to a String-based UUID identifier.

Specified by:
addMessageToGroup in interface MessageGroupStore
Parameters:
groupId - the group id to store the message under
message - a message

completeGroup

public void completeGroup(java.lang.Object groupId)
Method not implemented.

Specified by:
completeGroup in interface MessageGroupStore
Throws:
java.lang.UnsupportedOperationException

doPollForMessage

protected Message<?> doPollForMessage(java.lang.String groupIdKey)
This method executes a call to the DB to get the oldest Message in the MessageGroup which in the context of the JdbcChannelMessageStore means the channel identifier.

Parameters:
groupIdKey - String representation of message group (Channel) ID
Returns:
a message; could be null if query produced no Messages

getMessageCount

@ManagedAttribute
public long getMessageCount()
Method not implemented.

Throws:
java.lang.UnsupportedOperationException

getMessageCountForAllMessageGroups

@ManagedAttribute
public int getMessageCountForAllMessageGroups()
Method not implemented.

Specified by:
getMessageCountForAllMessageGroups in interface MessageGroupStore
Overrides:
getMessageCountForAllMessageGroups in class AbstractMessageGroupStore
Returns:
the number of messages
Throws:
java.lang.UnsupportedOperationException

getMessageGroup

public MessageGroup getMessageGroup(java.lang.Object groupId)
Not fully used. Only wraps the provided group id.

Specified by:
getMessageGroup in interface MessageGroupStore
Returns:
a group of messages, empty if none exists for this key

getMessageGroupCount

@ManagedAttribute
public int getMessageGroupCount()
Method not implemented.

Specified by:
getMessageGroupCount in interface MessageGroupStore
Overrides:
getMessageGroupCount in class AbstractMessageGroupStore
Returns:
the number message groups
Throws:
java.lang.UnsupportedOperationException

getQuery

protected java.lang.String getQuery(java.lang.String sqlQuery)
Replace patterns in the input to produce a valid SQL query. This implementation lazily initializes a simple map-based cache, only replacing the table prefix on the first access to a named query. Further accesses will be resolved from the cache.

Parameters:
sqlQuery - the SQL query to be transformed
Returns:
a transformed query with replacements

iterator

public java.util.Iterator<MessageGroup> iterator()
Method not implemented.

Specified by:
iterator in interface java.lang.Iterable<MessageGroup>
Specified by:
iterator in interface MessageGroupStore
Throws:
java.lang.UnsupportedOperationException

messageGroupSize

@ManagedAttribute
public int messageGroupSize(java.lang.Object groupId)
Returns the number of messages persisted for the specified channel id (groupId) and the specified region (setRegion(String)).

Specified by:
messageGroupSize in interface MessageGroupStore

pollMessageFromGroup

public Message<?> pollMessageFromGroup(java.lang.Object groupId)
Polls the database for a new message that is persisted for the given group id which represents the channel identifier.

Specified by:
pollMessageFromGroup in interface MessageGroupStore

removeMessageFromGroup

public MessageGroup removeMessageFromGroup(java.lang.Object groupId,
                                           Message<?> messageToRemove)
Remove a single message from the database.

Specified by:
removeMessageFromGroup in interface MessageGroupStore
Parameters:
groupId - The channel id to remove the message from
messageToRemove - The message to remove

removeFromIdCache

public void removeFromIdCache(java.lang.String messageId)

Remove a Message Id from the idCache. Should be used in conjunction with the Spring Integration Transaction Synchronization feature to remove a message from the Message Id cache once a transaction either succeeded or rolled back.

Only applicable if setUsingIdCache(boolean) is set to true

.

Parameters:
messageId -

getSizeOfIdCache

@ManagedMetric
public int getSizeOfIdCache()
Returns the size of the Message Id Cache, which caches Message Ids for those messages that are currently being processed.

Returns:
The size of the Message Id Cache

removeMessageGroup

public void removeMessageGroup(java.lang.Object groupId)
Will remove all messages from the message channel.

Specified by:
removeMessageGroup in interface MessageGroupStore
Parameters:
groupId - the id of the group to remove

Spring Integration