@ManagedResource public class JdbcChannelMessageStore extends Object implements PriorityCapableChannelMessageStore, InitializingBean, BeanFactoryAware
 Channel-specific implementation of MessageGroupStore using a relational
 database via JDBC.
 This message store shall be used for message channels only.
 
 As such, the JdbcChannelMessageStore uses database specific SQL queries.
 
 Contrary to the JdbcMessageStore, this implementation uses one single
 database table only. The SQL scripts to create the necessary table are packaged
 under org/springframework/integration/jdbc/messagestore/channel/schema-*.sql,
 where * denotes the target database type.
 
| Modifier and Type | Field and Description | 
|---|---|
| static String | CREATED_DATE_KEYThe name of the message header that stores a timestamp for the time the message was inserted. | 
| static String | DEFAULT_REGIONDefault region property, used to partition the message store. | 
| static String | DEFAULT_TABLE_PREFIXDefault value for the table prefix property. | 
| static String | SAVED_KEYThe name of the message header that stores a flag to indicate that the message has been saved. | 
| Constructor and Description | 
|---|
| JdbcChannelMessageStore()Convenient constructor for configuration use. | 
| JdbcChannelMessageStore(DataSource dataSource)Create a  MessageStorewith all mandatory properties. | 
| Modifier and Type | Method and Description | 
|---|---|
| MessageGroup | addMessageToGroup(Object groupId,
                 Message<?> message)Store a message in the database. | 
| void | afterPropertiesSet()Check mandatory properties ( DataSourceandsetChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)). | 
| protected Message<?> | doPollForMessage(String groupIdKey)This method executes a call to the DB to get the oldest Message in the
 MessageGroup which in the context of the  JdbcChannelMessageStoremeans the channel identifier. | 
| MessageGroup | getMessageGroup(Object groupId)Not fully used. | 
| int | getMessageGroupCount()Method not implemented. | 
| protected String | getQuery(String sqlQuery)Replace patterns in the input to produce a valid SQL query. | 
| int | getSizeOfIdCache()Returns the size of the Message Id Cache, which caches Message Ids for
 those messages that are currently being processed. | 
| boolean | isPriorityEnabled() | 
| int | messageGroupSize(Object groupId)Returns the number of messages persisted for the specified channel id (groupId)
 and the specified region ( setRegion(String)). | 
| Message<?> | pollMessageFromGroup(Object groupId)Polls the database for a new message that is persisted for the given
 group id which represents the channel identifier. | 
| void | removeFromIdCache(String messageId)Remove a Message Id from the idCache. | 
| void | removeMessageGroup(Object groupId)Remove the message group with this id. | 
| void | setBeanFactory(BeanFactory beanFactory) | 
| void | setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider)
 Sets the database specific  ChannelMessageStoreQueryProviderto use. | 
| void | setDataSource(DataSource dataSource)The JDBC  DataSourceto use when interacting with the database. | 
| void | setDeserializer(Deserializer<? extends Message<?>> deserializer)A converter for deserializing byte arrays to messages. | 
| void | setJdbcTemplate(JdbcTemplate jdbcTemplate)The  JdbcOperationsto use when interacting with the database. | 
| void | setLobHandler(LobHandler lobHandler)Override the  LobHandlerthat is used to create and unpack large objects in SQL queries. | 
| void | setMessageRowMapper(MessageRowMapper messageRowMapper)Allows for passing in a custom  MessageRowMapper. | 
| void | setPriorityEnabled(boolean priorityEnabled) | 
| void | setRegion(String region)A unique grouping identifier for all messages persisted with this store. | 
| void | setSerializer(Serializer<? super Message<?>> serializer)A converter for serializing messages to byte arrays for storage. | 
| void | setTablePrefix(String tablePrefix)Public setter for the table prefix property. | 
| void | setUsingIdCache(boolean usingIdCache)Consider using this property when polling the database transactionally
 using multiple parallel threads, meaning when the configured poller is configured
 using a task executor. | 
public static final String DEFAULT_TABLE_PREFIX
public static final String DEFAULT_REGION
public static final String SAVED_KEY
public static final String CREATED_DATE_KEY
public JdbcChannelMessageStore()
public JdbcChannelMessageStore(DataSource dataSource)
MessageStore with all mandatory properties. The passed-in
 DataSource is used to instantiate a JdbcTemplate
 with JdbcTemplate.setFetchSize(int) set to 1
 and with JdbcTemplate.setMaxRows(int) set to 1.dataSource - a DataSourcepublic void setDataSource(DataSource dataSource)
DataSource to use when interacting with the database.
 The passed-in DataSource is used to instantiate a JdbcTemplate
 with JdbcTemplate.setFetchSize(int) set to 1
 and with JdbcTemplate.setMaxRows(int) set to 1.dataSource - a DataSourcepublic void setDeserializer(Deserializer<? extends Message<?>> deserializer)
deserializer - the deserializer to setpublic void setJdbcTemplate(JdbcTemplate jdbcTemplate)
JdbcOperations to use when interacting with the database. Either
 this property can be set or the dataSource.
 Please consider passing in a JdbcTemplate with a fetchSize property
 of 1. This is particularly important for Oracle to ensure First In, First Out (FIFO)
 message retrieval characteristics.jdbcTemplate - a JdbcOperationspublic void setLobHandler(LobHandler lobHandler)
LobHandler that is used to create and unpack large objects in SQL queries. The default is
 fine for almost all platforms, but some Oracle drivers require a native implementation.lobHandler - a LobHandlerpublic void setMessageRowMapper(MessageRowMapper messageRowMapper)
MessageRowMapper. The MessageRowMapper
 is used to convert the selected database row representing the persisted
 message into the actual Message object.messageRowMapper - Must not be nullpublic void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider)
 Sets the database specific ChannelMessageStoreQueryProvider to use. The JdbcChannelMessageStore
 provides the SQL queries to retrieve messages from the database. The
 following ChannelMessageStoreQueryProvider are provided:
 
DerbyChannelMessageStoreQueryProviderMySqlChannelMessageStoreQueryProviderOracleChannelMessageStoreQueryProviderPostgresChannelMessageStoreQueryProviderBeyond, you can provide your own query implementations, in case you need to support additional databases and/or need to fine-tune the queries for your requirements.
channelMessageStoreQueryProvider - Must not be null.public void setRegion(String region)
DEFAULT_REGIONregion - the region name to setpublic void setSerializer(Serializer<? super Message<?>> serializer)
serializer - The serializer to setpublic void setTablePrefix(String tablePrefix)
DEFAULT_TABLE_PREFIX.tablePrefix - the tablePrefix to setpublic void setUsingIdCache(boolean usingIdCache)
Consider using this property when polling the database transactionally using multiple parallel threads, meaning when the configured poller is configured using a task executor.
The issue is that the pollMessageFromGroup(Object) looks for the
 oldest entry for a giving channel (groupKey) and region (setRegion(String)).
 If you do that with multiple threads and you are using transactions, other
 threads may be waiting for that same locked row.
If using the provided OracleChannelMessageStoreQueryProvider, don't set usingIdCache
 to true, as the Oracle query will ignore locked rows.
Using the id cache, the JdbcChannelMessageStore will store each
 message id in an in-memory collection for the duration of processing. With
 that, any polling threads will explicitly exclude those messages from
 being polled.
For this to work, you must setup the corresponding
 TransactionSynchronizationFactory:
 <int:transaction-synchronization-factory id="syncFactory">
     <int:after-commit   expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" />
     <int:after-rollback expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" />
 </int:transaction-synchronization-factory>
 
 
 This TransactionSynchronizationFactory is then referenced in the
 transaction configuration of the poller:
 
 <int:poller fixed-delay="300" receive-timeout="500"
     max-messages-per-poll="1" task-executor="pool">
     <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
         isolation="READ_COMMITTED" transaction-manager="transactionManager" />
 </int:poller>
 
 usingIdCache - When true the id cache will be used.public void setPriorityEnabled(boolean priorityEnabled)
public boolean isPriorityEnabled()
isPriorityEnabled in interface PriorityCapableChannelMessageStorepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException
setBeanFactory in interface BeanFactoryAwareBeansExceptionpublic void afterPropertiesSet()
                        throws Exception
DataSource and
 setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)). If no MessageRowMapper was
 explicitly set using setMessageRowMapper(MessageRowMapper), the default
 MessageRowMapper will be instantiate using the specified deserializer
 and lobHandler.
 Also, if the jdbcTemplate's fetchSize property (JdbcTemplate.getFetchSize())
 is not 1, a warning will be logged. When using the JdbcChannelMessageStore
 with Oracle, the fetchSize value of 1 is needed to ensure FIFO characteristics
 of polled messages. Please see the Oracle ChannelMessageStoreQueryProvider for more details.afterPropertiesSet in interface InitializingBeanException - Any Exception.public MessageGroup addMessageToGroup(Object groupId, Message<?> message)
addMessageToGroup in interface BasicMessageGroupStoregroupId - the group id to store the message undermessage - a messagepublic MessageGroup getMessageGroup(Object groupId)
getMessageGroup in interface BasicMessageGroupStoregroupId - The group identifier.@ManagedAttribute public int getMessageGroupCount()
UnsupportedOperationException - Method not supported.protected String getQuery(String sqlQuery)
sqlQuery - The SQL query to be transformed.@ManagedAttribute public int messageGroupSize(Object groupId)
setRegion(String)).messageGroupSize in interface BasicMessageGroupStoregroupId - The group identifier.public void removeMessageGroup(Object groupId)
BasicMessageGroupStoreremoveMessageGroup in interface BasicMessageGroupStoregroupId - The id of the group to remove.public Message<?> pollMessageFromGroup(Object groupId)
pollMessageFromGroup in interface BasicMessageGroupStoregroupId - The group identifier.protected Message<?> doPollForMessage(String groupIdKey)
JdbcChannelMessageStore
 means the channel identifier.groupIdKey - String representation of message group (Channel) IDpublic void removeFromIdCache(String messageId)
Remove a Message Id from the idCache. Should be used in conjunction with the Spring Integration Transaction Synchronization feature to remove a message from the Message Id cache once a transaction either succeeded or rolled back.
Only applicable if setUsingIdCache(boolean) is set to
 true
messageId - The message identifier.@ManagedMetric public int getSizeOfIdCache()