public class JdbcMessageStore extends AbstractMessageGroupStore implements MessageStore
MessageStore using a relational database via JDBC. SQL scripts to create the necessary
tables are packaged as org/springframework/integration/jdbc/schema-*.sql, where * is the
target database type.
If you intend backing a MessageChannel
using a JDBC-based Message Store,
please consider using the channel-specific JdbcChannelMessageStore instead.
This implementation is intended for correlation components (e.g. <aggregator>),
<delayer> and similar.
MessageGroupStore.MessageGroupCallback| Modifier and Type | Field and Description |
|---|---|
static String |
DEFAULT_TABLE_PREFIX
Default value for the table prefix property.
|
logger| Constructor and Description |
|---|
JdbcMessageStore(DataSource dataSource)
Create a
MessageStore with all mandatory properties. |
JdbcMessageStore(JdbcOperations jdbcOperations)
Create a
MessageStore with all mandatory properties. |
| Modifier and Type | Method and Description |
|---|---|
void |
addAllowedPatterns(String... patterns)
Add patterns for packages/classes that are allowed to be deserialized.
|
<T> Message<T> |
addMessage(Message<T> message)
Put the provided Message into the MessageStore.
|
void |
addMessagesToGroup(Object groupId,
Message<?>... messages)
Store messages with an association to a group id.
|
void |
completeGroup(Object groupId)
Completes this MessageGroup.
|
protected Message<?> |
doPollForMessage(String groupIdKey)
This method executes a call to the DB to get the oldest Message in the MessageGroup
Override this method if need to.
|
protected JdbcOperations |
getJdbcOperations()
To be used to get a reference to JdbcOperations
in case this class is subclassed
|
Message<?> |
getMessage(UUID id) |
long |
getMessageCount()
Optional attribute giving the number of messages in the store.
|
int |
getMessageCountForAllMessageGroups()
Optional attribute giving the number of messages in the store over all groups.
|
MessageGroup |
getMessageGroup(Object groupId)
Return all Messages currently in the MessageStore that were stored using
BasicMessageGroupStore.addMessageToGroup(Object, Message) with this group id. |
int |
getMessageGroupCount()
Optional attribute giving the number of message groups.
|
MessageMetadata |
getMessageMetadata(UUID id)
|
Collection<Message<?>> |
getMessagesForGroup(Object groupId)
Retrieve messages for the provided group id.
|
Message<?> |
getOneMessageFromGroup(Object groupId)
Return the one
Message from MessageGroup. |
protected String |
getQuery(org.springframework.integration.jdbc.store.JdbcMessageStore.Query base)
Replace patterns in the input to produce a valid SQL query.
|
Iterator<MessageGroup> |
iterator() |
int |
messageGroupSize(Object groupId)
Returns the size of this MessageGroup.
|
Message<?> |
pollMessageFromGroup(Object groupId)
Polls Message from this
MessageGroup (in FIFO style if supported by the implementation)
while also removing the polled Message |
Message<?> |
removeMessage(UUID id)
Remove the Message with the given id from the MessageStore, if present, and return it.
|
void |
removeMessageGroup(Object groupId)
Remove the message group with this id.
|
void |
removeMessagesFromGroup(Object groupId,
Collection<Message<?>> messages)
Persist the deletion of messages from the group.
|
void |
setDeserializer(Deserializer<? extends Message<?>> deserializer)
A converter for deserializing byte arrays to messages.
|
void |
setLastReleasedSequenceNumberForGroup(Object groupId,
int sequenceNumber)
Allows you to set the sequence number of the last released Message.
|
void |
setLobHandler(LobHandler lobHandler)
Override the
LobHandler that is used to create and unpack large objects in SQL queries. |
void |
setRegion(String region)
A unique grouping identifier for all messages persisted with this store.
|
void |
setSerializer(Serializer<? super Message<?>> serializer)
A converter for serializing messages to byte arrays for storage.
|
void |
setTablePrefix(String tablePrefix)
Public setter for the table prefix property.
|
addMessageToGroup, copy, expireMessageGroups, getGroupMetadata, getMessageGroupFactory, isTimeoutOnIdle, registerMessageGroupExpiryCallback, removeMessagesFromGroup, setExpiryCallbacks, setLazyLoadMessageGroups, setTimeoutOnIdlegetRemoveBatchSize, setMessageGroupFactory, setRemoveBatchSizeclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitforEach, spliteratorpublic static final String DEFAULT_TABLE_PREFIX
public JdbcMessageStore(DataSource dataSource)
MessageStore with all mandatory properties.dataSource - a DataSourcepublic JdbcMessageStore(JdbcOperations jdbcOperations)
MessageStore with all mandatory properties.jdbcOperations - a JdbcOperationspublic void setTablePrefix(String tablePrefix)
DEFAULT_TABLE_PREFIX.tablePrefix - the tablePrefix to setpublic void setRegion(String region)
DEFAULT.region - the region name to setpublic void setLobHandler(LobHandler lobHandler)
LobHandler that is used to create and unpack large objects in SQL queries. The default is
fine for almost all platforms, but some Oracle drivers require a native implementation.lobHandler - a LobHandlerpublic void setSerializer(Serializer<? super Message<?>> serializer)
serializer - the serializer to setpublic void setDeserializer(Deserializer<? extends Message<?>> deserializer)
deserializer - the deserializer to setpublic void addAllowedPatterns(String... patterns)
com.foo.*, *.MyClass.patterns - the patterns.public Message<?> removeMessage(UUID id)
MessageStoreremoveMessage in interface MessageStoreid - THe message identifier.@ManagedAttribute public long getMessageCount()
MessageStoregetMessageCount in interface MessageStorepublic Message<?> getMessage(UUID id)
getMessage in interface MessageStoreid - The message identifier.public MessageMetadata getMessageMetadata(UUID id)
MessageStoregetMessageMetadata in interface MessageStoreid - The message identifier.public <T> Message<T> addMessage(Message<T> message)
MessageStoreMessageStore.getMessage(UUID) and MessageStore.removeMessage(UUID) behave properly. Since messages are
immutable, putting the same message more than once is a no-op.addMessage in interface MessageStoreT - The payload type.message - The message.public void addMessagesToGroup(Object groupId, Message<?>... messages)
MessageGroupStoreaddMessagesToGroup in interface MessageGroupStoregroupId - The group id to store messages under.messages - The messages to add.@ManagedAttribute public int getMessageGroupCount()
MessageGroupStoregetMessageGroupCount in interface MessageGroupStoregetMessageGroupCount in class AbstractMessageGroupStore@ManagedAttribute public int getMessageCountForAllMessageGroups()
MessageGroupStoregetMessageCountForAllMessageGroups in interface MessageGroupStoregetMessageCountForAllMessageGroups in class AbstractMessageGroupStore@ManagedAttribute public int messageGroupSize(Object groupId)
BasicMessageGroupStoremessageGroupSize in interface BasicMessageGroupStoregroupId - The group identifier.public MessageGroup getMessageGroup(Object groupId)
BasicMessageGroupStoreBasicMessageGroupStore.addMessageToGroup(Object, Message) with this group id.getMessageGroup in interface BasicMessageGroupStoregroupId - The group identifier.public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messages)
MessageGroupStoreremoveMessagesFromGroup in interface MessageGroupStoregroupId - The groupId for the group containing the message(s).messages - The messages to be removed.public void removeMessageGroup(Object groupId)
BasicMessageGroupStoreremoveMessageGroup in interface BasicMessageGroupStoregroupId - The id of the group to remove.public void completeGroup(Object groupId)
MessageGroupStorecompleteGroup in interface MessageGroupStoregroupId - The group identifier.public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber)
MessageGroupStoresetLastReleasedSequenceNumberForGroup in interface MessageGroupStoregroupId - The group identifier.sequenceNumber - The sequence number.public Message<?> pollMessageFromGroup(Object groupId)
BasicMessageGroupStoreMessageGroup (in FIFO style if supported by the implementation)
while also removing the polled MessagepollMessageFromGroup in interface BasicMessageGroupStoregroupId - The group identifier.public Message<?> getOneMessageFromGroup(Object groupId)
MessageGroupStoreMessage from MessageGroup.getOneMessageFromGroup in interface MessageGroupStoregroupId - The group identifier.Message.public Collection<Message<?>> getMessagesForGroup(Object groupId)
MessageGroupStoregetMessagesForGroup in interface MessageGroupStoregroupId - The group id to retrieve messages for.public Iterator<MessageGroup> iterator()
iterator in interface Iterable<MessageGroup>iterator in interface MessageGroupStoreMessageGroups.protected String getQuery(org.springframework.integration.jdbc.store.JdbcMessageStore.Query base)
base - the SQL query to be transformedprotected JdbcOperations getJdbcOperations()
protected Message<?> doPollForMessage(String groupIdKey)
groupIdKey - String representation of message group ID