@ManagedResource public class JdbcChannelMessageStore extends AbstractMessageGroupStore implements org.springframework.beans.factory.InitializingBean
Channel-specific implementation of MessageGroupStore
using a relational
database via JDBC.
This message store shall be used for message channels only.
As such, the JdbcChannelMessageStore
uses database specific SQL queries.
Contrary to the JdbcMessageStore
, this implementation uses one single
database table only. The SQL scripts to create the necessary table are packaged
under org/springframework/integration/jdbc/messagestore/channel/schema-*.sql
,
where *
denotes the target database type.
MessageGroupStore.MessageGroupCallback
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
CREATED_DATE_KEY
The name of the message header that stores a timestamp for the time the message was inserted.
|
static int |
DEFAULT_LONG_STRING_LENGTH |
static java.lang.String |
DEFAULT_REGION
Default region property, used to partition the message store.
|
static java.lang.String |
DEFAULT_TABLE_PREFIX
Default value for the table prefix property.
|
static java.lang.String |
SAVED_KEY
The name of the message header that stores a flag to indicate that the message has been saved.
|
Constructor and Description |
---|
JdbcChannelMessageStore()
Convenient constructor for configuration use.
|
JdbcChannelMessageStore(javax.sql.DataSource dataSource)
Create a
MessageStore with all mandatory properties. |
Modifier and Type | Method and Description |
---|---|
MessageGroup |
addMessageToGroup(java.lang.Object groupId,
Message<?> message)
Store a message in the database.
|
void |
afterPropertiesSet()
Check mandatory properties (
DataSource and
setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider) ). |
void |
completeGroup(java.lang.Object groupId)
Method not implemented.
|
protected Message<?> |
doPollForMessage(java.lang.String groupIdKey)
This method executes a call to the DB to get the oldest Message in the
MessageGroup which in the context of the
JdbcChannelMessageStore
means the channel identifier. |
long |
getMessageCount()
Method not implemented.
|
int |
getMessageCountForAllMessageGroups()
Method not implemented.
|
MessageGroup |
getMessageGroup(java.lang.Object groupId)
Not fully used.
|
int |
getMessageGroupCount()
Method not implemented.
|
protected java.lang.String |
getQuery(java.lang.String sqlQuery)
Replace patterns in the input to produce a valid SQL query.
|
int |
getSizeOfIdCache()
Returns the size of the Message Id Cache, which caches Message Ids for
those messages that are currently being processed.
|
java.util.Iterator<MessageGroup> |
iterator()
Method not implemented.
|
int |
messageGroupSize(java.lang.Object groupId)
Returns the number of messages persisted for the specified channel id (groupId)
and the specified region (
setRegion(String) ). |
Message<?> |
pollMessageFromGroup(java.lang.Object groupId)
Polls the database for a new message that is persisted for the given
group id which represents the channel identifier.
|
void |
removeFromIdCache(java.lang.String messageId)
Remove a Message Id from the idCache.
|
MessageGroup |
removeMessageFromGroup(java.lang.Object groupId,
Message<?> messageToRemove)
Remove a single message from the database.
|
void |
removeMessageGroup(java.lang.Object groupId)
Will remove all messages from the message channel.
|
void |
setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider)
Sets the database specific
ChannelMessageStoreQueryProvider to use. |
void |
setDataSource(javax.sql.DataSource dataSource)
The JDBC
DataSource to use when interacting with the database. |
void |
setDeserializer(org.springframework.core.serializer.Deserializer<? extends Message<?>> deserializer)
A converter for deserializing byte arrays to messages.
|
void |
setJdbcTemplate(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate)
The
JdbcOperations to use when interacting with the database. |
void |
setLastReleasedSequenceNumberForGroup(java.lang.Object groupId,
int sequenceNumber)
Method not implemented.
|
void |
setLobHandler(org.springframework.jdbc.support.lob.LobHandler lobHandler)
Override the
LobHandler that is used to create and unpack large objects in SQL queries. |
void |
setMessageRowMapper(MessageRowMapper messageRowMapper)
Allows for passing in a custom
MessageRowMapper . |
void |
setRegion(java.lang.String region)
A unique grouping identifier for all messages persisted with this store.
|
void |
setSerializer(org.springframework.core.serializer.Serializer<? super Message<?>> serializer)
A converter for serializing messages to byte arrays for storage.
|
void |
setTablePrefix(java.lang.String tablePrefix)
Public setter for the table prefix property.
|
void |
setUsingIdCache(boolean usingIdCache)
Consider using this property when polling the database transactionally
using multiple parallel threads, meaning when the configured poller is configured
using a task executor.
|
expireMessageGroups, isTimeoutOnIdle, registerMessageGroupExpiryCallback, setExpiryCallbacks, setTimeoutOnIdle
public static final java.lang.String DEFAULT_TABLE_PREFIX
public static final java.lang.String DEFAULT_REGION
public static final int DEFAULT_LONG_STRING_LENGTH
public static final java.lang.String SAVED_KEY
public static final java.lang.String CREATED_DATE_KEY
public JdbcChannelMessageStore()
public JdbcChannelMessageStore(javax.sql.DataSource dataSource)
MessageStore
with all mandatory properties. The passed-in
DataSource
is used to instantiate a JdbcTemplate
with JdbcTemplate.setFetchSize(int)
set to 1
and with JdbcTemplate.setMaxRows(int)
set to 1
.dataSource
- a DataSource
public void setDataSource(javax.sql.DataSource dataSource)
DataSource
to use when interacting with the database.
The passed-in DataSource
is used to instantiate a JdbcTemplate
with JdbcTemplate.setFetchSize(int)
set to 1
and with JdbcTemplate.setMaxRows(int)
set to 1
.dataSource
- a DataSource
public void setDeserializer(org.springframework.core.serializer.Deserializer<? extends Message<?>> deserializer)
deserializer
- the deserializer to setpublic void setJdbcTemplate(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate)
JdbcOperations
to use when interacting with the database. Either
this property can be set or the dataSource
.
Please consider passing in a JdbcTemplate
with a fetchSize property
of 1. This is particularly important for Oracle to ensure First In, First Out (FIFO)
message retrieval characteristics.jdbcTemplate
- a JdbcOperations
public void setLastReleasedSequenceNumberForGroup(java.lang.Object groupId, int sequenceNumber)
setLastReleasedSequenceNumberForGroup
in interface MessageGroupStore
java.lang.UnsupportedOperationException
public void setLobHandler(org.springframework.jdbc.support.lob.LobHandler lobHandler)
LobHandler
that is used to create and unpack large objects in SQL queries. The default is
fine for almost all platforms, but some Oracle drivers require a native implementation.lobHandler
- a LobHandler
public void setMessageRowMapper(MessageRowMapper messageRowMapper)
MessageRowMapper
. The MessageRowMapper
is used to convert the selected database row representing the persisted
message into the actual Message
object.messageRowMapper
- Must not be nullpublic void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider)
Sets the database specific ChannelMessageStoreQueryProvider
to use. The JdbcChannelMessageStore
provides the SQL queries to retrieve messages from the database. The
following ChannelMessageStoreQueryProvider
are provided:
DerbyChannelMessageStoreQueryProvider
MySqlChannelMessageStoreQueryProvider
OracleChannelMessageStoreQueryProvider
PostgresChannelMessageStoreQueryProvider
Beyond, you can provide your own query implementations, in case you need to support additional databases and/or need to fine-tune the queries for your requirements.
channelMessageStoreQueryProvider
- Must not be null.public void setRegion(java.lang.String region)
DEFAULT_REGION
.region
- the region name to setpublic void setSerializer(org.springframework.core.serializer.Serializer<? super Message<?>> serializer)
serializer
- The serializer to setpublic void setTablePrefix(java.lang.String tablePrefix)
DEFAULT_TABLE_PREFIX
.tablePrefix
- the tablePrefix to setpublic void setUsingIdCache(boolean usingIdCache)
Consider using this property when polling the database transactionally using multiple parallel threads, meaning when the configured poller is configured using a task executor.
The issue is that the pollMessageFromGroup(Object)
looks for the
oldest entry for a giving channel (groupKey) and region (setRegion(String)
).
If you do that with multiple threads and you are using transactions, other
threads may be waiting for that same locked row.
If using the provided OracleChannelMessageStoreQueryProvider
, don't set usingIdCache
to true, as the Oracle query will ignore locked rows.
Using the id cache, the JdbcChannelMessageStore
will store each
message id in an in-memory collection for the duration of processing. With
that, any polling threads will explicitly exclude those messages from
being polled.
For this to work, you must setup the corresponding
TransactionSynchronizationFactory
:
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@jdbcChannelMessageStore.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>
This TransactionSynchronizationFactory
is then referenced in the
transaction configuration of the poller:
<int:poller fixed-delay="300" receive-timeout="500"
max-messages-per-poll="1" task-executor="pool">
<int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
isolation="READ_COMMITTED" transaction-manager="transactionManager" />
</int:poller>
usingIdCache
- When true
the id cache will be used.public void afterPropertiesSet() throws java.lang.Exception
DataSource
and
setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)
). If no MessageRowMapper
was
explicitly set using setMessageRowMapper(MessageRowMapper)
, the default
MessageRowMapper
will be instantiate using the specified deserializer
and lobHandler
.
Also, if the jdbcTemplate's fetchSize property (JdbcTemplate.getFetchSize()
)
is not 1, a warning will be logged. When using the JdbcChannelMessageStore
with Oracle, the fetchSize value of 1 is needed to ensure FIFO characteristics
of polled messages. Please see the Oracle ChannelMessageStoreQueryProvider
for more details.afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
java.lang.Exception
public MessageGroup addMessageToGroup(java.lang.Object groupId, Message<?> message)
addMessageToGroup
in interface MessageGroupStore
groupId
- the group id to store the message undermessage
- a messagepublic void completeGroup(java.lang.Object groupId)
completeGroup
in interface MessageGroupStore
java.lang.UnsupportedOperationException
protected Message<?> doPollForMessage(java.lang.String groupIdKey)
JdbcChannelMessageStore
means the channel identifier.groupIdKey
- String representation of message group (Channel) ID@ManagedAttribute public long getMessageCount()
java.lang.UnsupportedOperationException
@ManagedAttribute public int getMessageCountForAllMessageGroups()
getMessageCountForAllMessageGroups
in interface MessageGroupStore
getMessageCountForAllMessageGroups
in class AbstractMessageGroupStore
java.lang.UnsupportedOperationException
public MessageGroup getMessageGroup(java.lang.Object groupId)
getMessageGroup
in interface MessageGroupStore
@ManagedAttribute public int getMessageGroupCount()
getMessageGroupCount
in interface MessageGroupStore
getMessageGroupCount
in class AbstractMessageGroupStore
java.lang.UnsupportedOperationException
protected java.lang.String getQuery(java.lang.String sqlQuery)
sqlQuery
- the SQL query to be transformedpublic java.util.Iterator<MessageGroup> iterator()
iterator
in interface java.lang.Iterable<MessageGroup>
iterator
in interface MessageGroupStore
java.lang.UnsupportedOperationException
@ManagedAttribute public int messageGroupSize(java.lang.Object groupId)
setRegion(String)
).messageGroupSize
in interface MessageGroupStore
public Message<?> pollMessageFromGroup(java.lang.Object groupId)
pollMessageFromGroup
in interface MessageGroupStore
public MessageGroup removeMessageFromGroup(java.lang.Object groupId, Message<?> messageToRemove)
removeMessageFromGroup
in interface MessageGroupStore
groupId
- The channel id to remove the message frommessageToRemove
- The message to removepublic void removeFromIdCache(java.lang.String messageId)
Remove a Message Id from the idCache. Should be used in conjunction with the Spring Integration Transaction Synchronization feature to remove a message from the Message Id cache once a transaction either succeeded or rolled back.
Only applicable if setUsingIdCache(boolean)
is set to
true
messageId
- @ManagedMetric public int getSizeOfIdCache()
public void removeMessageGroup(java.lang.Object groupId)
removeMessageGroup
in interface MessageGroupStore
groupId
- the id of the group to remove