Class JdbcChannelMessageStore
- All Implemented Interfaces:
InitializingBean
,BasicMessageGroupStore
,ChannelMessageStore
,PriorityCapableChannelMessageStore
Channel-specific implementation of
BasicMessageGroupStore
using a relational
database via JDBC.
This message store shall be used for message channels only.
As such, the JdbcChannelMessageStore
uses database specific SQL queries.
Contrary to the JdbcMessageStore
, this implementation uses a single database table,
optimized to operate like a queue.
The SQL scripts for creating the table are packaged
under org/springframework/integration/jdbc/schema-*.sql
,
where *
denotes the target database type.
- Since:
- 2.2
- Author:
- Gunnar Hillert, Artem Bilan, Gary Russell, Meherzad Lahewala, Trung Pham
-
Field Summary
-
Constructor Summary
ConstructorDescriptionConvenient constructor for configuration use.JdbcChannelMessageStore
(DataSource dataSource) Create aMessageStore
with all mandatory properties. -
Method Summary
Modifier and TypeMethodDescriptionvoid
addAllowedPatterns
(String... patterns) Add patterns for packages/classes that are allowed to be deserialized.addMessageToGroup
(Object groupId, Message<?> message) Store a message in the database.void
Check mandatory properties (DataSource
andsetChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)
).protected Message<?>
doPollForMessage
(String groupIdKey) This method executes a call to the DB to get the oldest Message in the MessageGroup which in the context of theJdbcChannelMessageStore
means the channel identifier.getMessageGroup
(Object groupId) Not fully used.int
Return the number of message groups in the store for configured region.protected MessageGroupFactory
protected String
getQuery
(org.springframework.integration.jdbc.store.JdbcChannelMessageStore.Query queryName, Supplier<String> queryProvider) Replace patterns in the input to produce a valid SQL query.int
Return the size of the Message Id Cache, which caches Message Ids for those messages that are currently being processed.boolean
int
messageGroupSize
(Object groupId) Return the number of messages persisted for the specified channel id (groupId) and the specified region (setRegion(String)
).Message<?>
pollMessageFromGroup
(Object groupId) Poll the database for a new message that is persisted for the given group id which represents the channel identifier.void
removeFromIdCache
(String messageId) Remove a Message Id from the idCache.void
removeMessageGroup
(Object groupId) Remove the message group with this id.void
setChannelMessageStoreQueryProvider
(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider) Set the database specificChannelMessageStoreQueryProvider
to use.void
setDataSource
(DataSource dataSource) The JDBCDataSource
to use when interacting with the database.void
setDeserializer
(Deserializer<? extends Message<?>> deserializer) A converter for deserializing byte arrays to messages.void
setJdbcTemplate
(JdbcTemplate jdbcTemplate) TheJdbcOperations
to use when interacting with the database.void
setLobHandler
(LobHandler lobHandler) Override theLobHandler
that is used to create and unpack large objects in SQL queries.void
setMessageGroupFactory
(MessageGroupFactory messageGroupFactory) Specify theMessageGroupFactory
to createMessageGroup
object where it is necessary.void
setMessageRowMapper
(MessageRowMapper messageRowMapper) Allow for passing in a customMessageRowMapper
.void
setPreparedStatementSetter
(ChannelMessageStorePreparedStatementSetter preparedStatementSetter) Set aChannelMessageStorePreparedStatementSetter
to insert message into the database.void
setPriorityEnabled
(boolean priorityEnabled) void
A unique grouping identifier for all messages persisted with this store.void
setSerializer
(Serializer<? super Message<?>> serializer) A converter for serializing messages to byte arrays for storage.void
setTablePrefix
(String tablePrefix) Public setter for the table prefix property.void
setUsingIdCache
(boolean usingIdCache) Consider using this property when polling the database transactionally using multiple parallel threads, meaning when the configured poller is configured using a task executor.
-
Field Details
-
DEFAULT_REGION
Default region property, used to partition the message store. For example, a separate Spring Integration application with overlapping channel names may use the same message store by providing a distinct region name.- See Also:
-
DEFAULT_TABLE_PREFIX
Default value for the table prefix property.- See Also:
-
-
Constructor Details
-
JdbcChannelMessageStore
public JdbcChannelMessageStore()Convenient constructor for configuration use. -
JdbcChannelMessageStore
Create aMessageStore
with all mandatory properties. The passed-inDataSource
is used to instantiate aJdbcTemplate
withJdbcTemplate.setFetchSize(int)
set to1
and withJdbcTemplate.setMaxRows(int)
set to1
.- Parameters:
dataSource
- aDataSource
-
-
Method Details
-
setDataSource
The JDBCDataSource
to use when interacting with the database. The passed-inDataSource
is used to instantiate aJdbcTemplate
withJdbcTemplate.setFetchSize(int)
set to1
and withJdbcTemplate.setMaxRows(int)
set to1
.- Parameters:
dataSource
- aDataSource
-
setDeserializer
A converter for deserializing byte arrays to messages.- Parameters:
deserializer
- the deserializer to set
-
addAllowedPatterns
Add patterns for packages/classes that are allowed to be deserialized. A class can be fully qualified or a wildcard '*' is allowed at the beginning or end of the class name. Examples:com.foo.*
,*.MyClass
.- Parameters:
patterns
- the patterns.- Since:
- 5.4
-
setJdbcTemplate
TheJdbcOperations
to use when interacting with the database. Either this property can be set or thedataSource
. Please consider passing in aJdbcTemplate
with a fetchSize property of 1. This is particularly important for Oracle to ensure First In, First Out (FIFO) message retrieval characteristics.- Parameters:
jdbcTemplate
- aJdbcOperations
-
setLobHandler
Override theLobHandler
that is used to create and unpack large objects in SQL queries. The default is fine for almost all platforms, but some Oracle drivers require a native implementation.- Parameters:
lobHandler
- aLobHandler
-
setMessageRowMapper
Allow for passing in a customMessageRowMapper
. TheMessageRowMapper
is used to convert the selected database row representing the persisted message into the actualMessage
object.- Parameters:
messageRowMapper
- Must not be null
-
setPreparedStatementSetter
public void setPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter preparedStatementSetter) Set aChannelMessageStorePreparedStatementSetter
to insert message into the database.- Parameters:
preparedStatementSetter
-ChannelMessageStorePreparedStatementSetter
to use. Must not be null- Since:
- 5.0
-
setChannelMessageStoreQueryProvider
public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider) Set the database specificChannelMessageStoreQueryProvider
to use. TheJdbcChannelMessageStore
provides the SQL queries to retrieve messages from the database. See the JavaDocsChannelMessageStoreQueryProvider
(all known implementing classes) to see those implementations provided by the framework.You can provide your own query implementations, if you need to support additional databases and/or need to fine-tune the queries for your requirements.
- Parameters:
channelMessageStoreQueryProvider
- Must not be null.
-
setRegion
A unique grouping identifier for all messages persisted with this store. Using multiple regions allows the store to be partitioned (if necessary) for different purposes. Defaults toDEFAULT_REGION
.- Parameters:
region
- the region name to set
-
setSerializer
A converter for serializing messages to byte arrays for storage.- Parameters:
serializer
- The serializer to set
-
setTablePrefix
Public setter for the table prefix property. This will be prefixed to all the table names before queries are executed. Defaults toDEFAULT_TABLE_PREFIX
.- Parameters:
tablePrefix
- the tablePrefix to set
-
setUsingIdCache
public void setUsingIdCache(boolean usingIdCache) Consider using this property when polling the database transactionally using multiple parallel threads, meaning when the configured poller is configured using a task executor.
The issue is that the
pollMessageFromGroup(Object)
looks for the oldest entry for a giving channel (groupKey) and region (setRegion(String)
). If you do that with multiple threads and you are using transactions, other threads may be waiting for that same locked row.If using the provided
OracleChannelMessageStoreQueryProvider
, don't setusingIdCache
to true, as the Oracle query will ignore locked rows.Using the id cache, the
JdbcChannelMessageStore
will store each message id in an in-memory collection for the duration of processing. With that, any polling threads will explicitly exclude those messages from being polled.For this to work, you must setup the corresponding
TransactionSynchronizationFactory
:<int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" /> <int:after-rollback expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" /> </int:transaction-synchronization-factory>
TransactionSynchronizationFactory
is then referenced in the transaction configuration of the poller:<int:poller fixed-delay="300" receive-timeout="500" max-messages-per-poll="1" task-executor="pool"> <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="transactionManager" /> </int:poller>
- Parameters:
usingIdCache
- Whentrue
the id cache will be used.
-
setPriorityEnabled
public void setPriorityEnabled(boolean priorityEnabled) -
isPriorityEnabled
public boolean isPriorityEnabled()- Specified by:
isPriorityEnabled
in interfacePriorityCapableChannelMessageStore
- Returns:
- true if message priority is enabled in this channel message store.
-
setMessageGroupFactory
Specify theMessageGroupFactory
to createMessageGroup
object where it is necessary. Defaults toSimpleMessageGroupFactory
.- Parameters:
messageGroupFactory
- theMessageGroupFactory
to use.- Since:
- 4.3
-
getMessageGroupFactory
-
afterPropertiesSet
public void afterPropertiesSet()Check mandatory properties (DataSource
andsetChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)
). If noMessageRowMapper
andChannelMessageStorePreparedStatementSetter
was explicitly set usingsetMessageRowMapper(MessageRowMapper)
andsetPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter)
respectively, the defaultMessageRowMapper
andChannelMessageStorePreparedStatementSetter
will be instantiate using the specifieddeserializer
andlobHandler
. Also, if the jdbcTemplate's fetchSize property (JdbcTemplate.getFetchSize()
) is not 1, a warning will be logged. When using theJdbcChannelMessageStore
with Oracle, the fetchSize value of 1 is needed to ensure FIFO characteristics of polled messages. Please see the OracleChannelMessageStoreQueryProvider
for more details.- Specified by:
afterPropertiesSet
in interfaceInitializingBean
-
addMessageToGroup
Store a message in the database. The groupId identifies the channel for which the message is to be stored. Keep in mind that the actual groupId (Channel Identifier) is converted to a String-based UUID identifier.- Specified by:
addMessageToGroup
in interfaceBasicMessageGroupStore
- Parameters:
groupId
- the group id to store the message undermessage
- a message- Returns:
- The message group.
-
getMessageGroup
Not fully used. Only wraps the provided group id.- Specified by:
getMessageGroup
in interfaceBasicMessageGroupStore
- Parameters:
groupId
- The group identifier.- Returns:
- A group of messages, empty if none exists for this key.
-
getMessageGroupCount
Return the number of message groups in the store for configured region.- Returns:
- The message group count.
-
getQuery
protected String getQuery(org.springframework.integration.jdbc.store.JdbcChannelMessageStore.Query queryName, Supplier<String> queryProvider) Replace patterns in the input to produce a valid SQL query. This implementation lazily initializes a simple map-based cache, only replacing the table prefix on the first access to a named query. Further accesses will be resolved from the cache.- Parameters:
queryName
- TheJdbcChannelMessageStore.Query
to be transformed.queryProvider
- a supplier to provide the query template.- Returns:
- A transformed query with replacements.
-
messageGroupSize
Return the number of messages persisted for the specified channel id (groupId) and the specified region (setRegion(String)
).- Specified by:
messageGroupSize
in interfaceBasicMessageGroupStore
- Parameters:
groupId
- The group identifier.- Returns:
- The message group size.
-
removeMessageGroup
Description copied from interface:BasicMessageGroupStore
Remove the message group with this id.- Specified by:
removeMessageGroup
in interfaceBasicMessageGroupStore
- Parameters:
groupId
- The id of the group to remove.
-
pollMessageFromGroup
Poll the database for a new message that is persisted for the given group id which represents the channel identifier.- Specified by:
pollMessageFromGroup
in interfaceBasicMessageGroupStore
- Parameters:
groupId
- The group identifier.- Returns:
- The message.
-
doPollForMessage
This method executes a call to the DB to get the oldest Message in the MessageGroup which in the context of theJdbcChannelMessageStore
means the channel identifier.- Parameters:
groupIdKey
- String representation of message group (Channel) ID- Returns:
- a message; could be null if query produced no Messages
-
removeFromIdCache
Remove a Message Id from the idCache. Should be used in conjunction with the Spring Integration Transaction Synchronization feature to remove a message from the Message Id cache once a transaction either succeeded or rolled back. Only applicable ifsetUsingIdCache(boolean)
is set totrue
.- Parameters:
messageId
- The message identifier.
-
getSizeOfIdCache
Return the size of the Message Id Cache, which caches Message Ids for those messages that are currently being processed.- Returns:
- The size of the Message Id Cache
-