Class JdbcMessageStore
java.lang.Object
org.springframework.integration.store.AbstractBatchingMessageGroupStore
org.springframework.integration.store.AbstractMessageGroupStore
org.springframework.integration.jdbc.store.JdbcMessageStore
- All Implemented Interfaces:
Iterable<MessageGroup>
,Aware
,BeanClassLoaderAware
,Lifecycle
,Phased
,SmartLifecycle
,BasicMessageGroupStore
,MessageGroupStore
,MessageStore
public class JdbcMessageStore
extends AbstractMessageGroupStore
implements MessageStore, BeanClassLoaderAware, SmartLifecycle
Implementation of
MessageStore
using a relational database via JDBC. SQL scripts to create the necessary
tables are packaged as org/springframework/integration/jdbc/schema-*.sql
, where *
is the
target database type.
If you intend backing a MessageChannel
using a JDBC-based Message Store,
please consider using the channel-specific JdbcChannelMessageStore
instead.
This implementation is intended for correlation components (e.g. <aggregator>
),
<delayer>
and similar.
This class implements SmartLifecycle
and calls getMessageGroupCount()
on start()
to check if required tables are present in DB.
The application context will fail to start if the table is not present.
This check can be disabled via setCheckDatabaseOnStart(boolean)
.
- Since:
- 2.0
- Author:
- Dave Syer, Oleg Zhurakousky, Matt Stine, Gunnar Hillert, Will Schipp, Gary Russell, Artem Bilan, Ngoc Nhan, Youbin Wu
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.store.MessageGroupStore
MessageGroupStore.MessageGroupCallback
-
Field Summary
Modifier and TypeFieldDescriptionstatic final String
Default value for the table prefix property.Fields inherited from class org.springframework.integration.store.AbstractMessageGroupStore
GROUP_ID_MUST_NOT_BE_NULL, INTERRUPTED_WHILE_OBTAINING_LOCK, logger
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Constructor Summary
ConstructorDescriptionJdbcMessageStore
(DataSource dataSource) Create aMessageStore
with all mandatory properties.JdbcMessageStore
(JdbcOperations jdbcOperations) Create aMessageStore
with all mandatory properties. -
Method Summary
Modifier and TypeMethodDescriptionvoid
addAllowedPatterns
(String... patterns) Add patterns for packages/classes that are allowed to be deserialized.<T> Message<T>
addMessage
(Message<T> message) Put the provided Message into the MessageStore.protected void
doAddMessagesToGroup
(Object groupId, Message<?>... messages) protected void
doCompleteGroup
(Object groupId) protected Message<?>
doPollForMessage
(String groupIdKey) This method executes a call to the DB to get the oldest Message in the MessageGroup Override this method if need to.protected Message<?>
doPollMessageFromGroup
(Object groupId) protected boolean
doRemoveMessageFromGroupById
(Object groupId, UUID messageId) protected void
doRemoveMessageGroup
(Object groupId) protected void
doRemoveMessagesFromGroup
(Object groupId, Collection<Message<?>> messages) protected void
doSetGroupCondition
(Object groupId, String condition) protected void
doSetLastReleasedSequenceNumberForGroup
(Object groupId, int sequenceNumber) getGroupMetadata
(Object groupId) Obtain the group metadata without fetching any messages; must supply all other group properties; may include the id of the first message.protected JdbcOperations
To be used to get a reference to JdbcOperations in case this class is subclassed.Message<?>
getMessage
(UUID id) long
Optional attribute giving the number of messages in the store.int
Optional attribute giving the number of messages in the store over all groups.Message<?>
getMessageFromGroup
(Object groupId, UUID messageId) Retrieve aMessage
from a group by id.getMessageGroup
(Object groupId) Return all Messages currently in the MessageStore that were stored usingBasicMessageGroupStore.addMessageToGroup(Object, Message)
with this group id.int
Optional attribute giving the number of message groups.Collection<Message<?>>
getMessagesForGroup
(Object groupId) Retrieve messages for the provided group id.Message<?>
getOneMessageFromGroup
(Object groupId) Return the oneMessage
fromMessageGroup
.protected String
getQuery
(org.springframework.integration.jdbc.store.JdbcMessageStore.Query base) Replace patterns in the input to produce a valid SQL query.boolean
boolean
iterator()
int
messageGroupSize
(Object groupId) Return the size of this MessageGroup.Message<?>
removeMessage
(UUID id) Remove the Message with the given id from the MessageStore, if present, and return it.void
setBeanClassLoader
(ClassLoader classLoader) void
setCheckDatabaseOnStart
(boolean checkDatabaseOnStart) The flag to perform a database check query on start or not.void
setDeserializer
(Deserializer<? extends Message<?>> deserializer) A converter for deserializing byte arrays to message.void
A unique grouping identifier for all messages persisted with this store.void
setSerializer
(Serializer<? super Message<?>> serializer) A converter for serializing messages to byte arrays for storage.void
setTablePrefix
(String tablePrefix) Public setter for the table prefix property.void
start()
void
stop()
streamMessagesForGroup
(Object groupId) Return a stream for messages stored in the provided group.Methods inherited from class org.springframework.integration.store.AbstractMessageGroupStore
addMessagesToGroup, addMessageToGroup, completeGroup, copy, executeLocked, executeLocked, expireMessageGroups, getLockRegistry, getMessageGroupFactory, isTimeoutOnIdle, pollMessageFromGroup, registerMessageGroupExpiryCallback, removeMessageFromGroupById, removeMessageGroup, removeMessagesFromGroup, removeMessagesFromGroup, setExpiryCallbacks, setGroupCondition, setLastReleasedSequenceNumberForGroup, setLazyLoadMessageGroups, setLockRegistry, setTimeoutOnIdle
Methods inherited from class org.springframework.integration.store.AbstractBatchingMessageGroupStore
getRemoveBatchSize, setMessageGroupFactory, setRemoveBatchSize
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface java.lang.Iterable
forEach, spliterator
Methods inherited from interface org.springframework.context.SmartLifecycle
getPhase, stop
-
Field Details
-
DEFAULT_TABLE_PREFIX
Default value for the table prefix property.- See Also:
-
-
Constructor Details
-
JdbcMessageStore
Create aMessageStore
with all mandatory properties.- Parameters:
dataSource
- aDataSource
-
JdbcMessageStore
Create aMessageStore
with all mandatory properties.- Parameters:
jdbcOperations
- aJdbcOperations
- Since:
- 4.3.9
-
-
Method Details
-
setBeanClassLoader
- Specified by:
setBeanClassLoader
in interfaceBeanClassLoaderAware
-
setTablePrefix
Public setter for the table prefix property. This will be prefixed to all the table names before queries are executed. Defaults toDEFAULT_TABLE_PREFIX
.- Parameters:
tablePrefix
- the tablePrefix to set
-
setRegion
A unique grouping identifier for all messages persisted with this store. Using multiple regions allows the store to be partitioned (if necessary) for different purposes. Defaults toDEFAULT
.- Parameters:
region
- the region name to set
-
setSerializer
A converter for serializing messages to byte arrays for storage.- Parameters:
serializer
- the serializer to set
-
setDeserializer
A converter for deserializing byte arrays to message.- Parameters:
deserializer
- the deserializer to set
-
addAllowedPatterns
Add patterns for packages/classes that are allowed to be deserialized. A class can be fully qualified or a wildcard '*' is allowed at the beginning or end of the class name. Examples:com.foo.*
,*.MyClass
.- Parameters:
patterns
- the patterns.- Since:
- 5.4
-
setCheckDatabaseOnStart
public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart) The flag to perform a database check query on start or not.- Parameters:
checkDatabaseOnStart
- false to not perform the database check.- Since:
- 6.2
-
isAutoStartup
public boolean isAutoStartup()- Specified by:
isAutoStartup
in interfaceSmartLifecycle
-
start
public void start() -
stop
public void stop() -
isRunning
public boolean isRunning() -
removeMessage
Description copied from interface:MessageStore
Remove the Message with the given id from the MessageStore, if present, and return it. If no Message with that id is present in the store, this will returnnull
. If this method is implemented on aMessageGroupStore
, the message is removed from the store only if no groups holding this message.- Specified by:
removeMessage
in interfaceMessageStore
- Parameters:
id
- the message identifier.- Returns:
- the message (if any).
-
getMessageCount
Description copied from interface:MessageStore
Optional attribute giving the number of messages in the store. Implementations may decline to respond by throwing an exception.- Specified by:
getMessageCount
in interfaceMessageStore
- Returns:
- The number of messages.
-
getMessage
- Specified by:
getMessage
in interfaceMessageStore
- Parameters:
id
- The message identifier.- Returns:
- The Message with the given id, or null if no Message with that id exists in the MessageStore.
-
getMessageMetadata
Description copied from interface:MessageStore
- Specified by:
getMessageMetadata
in interfaceMessageStore
- Parameters:
id
- The message identifier.- Returns:
- The MessageMetadata with the given id, or null if no Message with that id exists in the MessageStore or the message has no metadata (legacy message from an earlier version).
-
addMessage
Description copied from interface:MessageStore
Put the provided Message into the MessageStore. The store may need to mutate the message internally, and if it does then the return value can be different than the input. The id of the return value will be used as an index so that theMessageStore.getMessage(UUID)
andMessageStore.removeMessage(UUID)
behave properly. Since messages are immutable, putting the same message more than once is a no-op.- Specified by:
addMessage
in interfaceMessageStore
- Type Parameters:
T
- The payload type.- Parameters:
message
- The message.- Returns:
- The message that was stored.
-
doAddMessagesToGroup
- Specified by:
doAddMessagesToGroup
in classAbstractMessageGroupStore
-
getMessageGroupCount
Description copied from interface:MessageGroupStore
Optional attribute giving the number of message groups. Implementations may decline to respond by throwing an exception.- Specified by:
getMessageGroupCount
in interfaceMessageGroupStore
- Overrides:
getMessageGroupCount
in classAbstractMessageGroupStore
- Returns:
- the number message groups
-
getMessageCountForAllMessageGroups
Description copied from interface:MessageGroupStore
Optional attribute giving the number of messages in the store over all groups. Implementations may decline to respond by throwing an exception.- Specified by:
getMessageCountForAllMessageGroups
in interfaceMessageGroupStore
- Overrides:
getMessageCountForAllMessageGroups
in classAbstractMessageGroupStore
- Returns:
- the number of messages
-
messageGroupSize
Description copied from interface:BasicMessageGroupStore
Return the size of this MessageGroup.- Specified by:
messageGroupSize
in interfaceBasicMessageGroupStore
- Parameters:
groupId
- The group identifier.- Returns:
- The size.
-
getMessageGroup
Description copied from interface:BasicMessageGroupStore
Return all Messages currently in the MessageStore that were stored usingBasicMessageGroupStore.addMessageToGroup(Object, Message)
with this group id.- Specified by:
getMessageGroup
in interfaceBasicMessageGroupStore
- Parameters:
groupId
- The group identifier.- Returns:
- A group of messages, empty if none exists for this key.
-
getGroupMetadata
Description copied from interface:MessageGroupStore
Obtain the group metadata without fetching any messages; must supply all other group properties; may include the id of the first message.- Specified by:
getGroupMetadata
in interfaceMessageGroupStore
- Overrides:
getGroupMetadata
in classAbstractMessageGroupStore
- Parameters:
groupId
- The group id.- Returns:
- The metadata.
-
doRemoveMessagesFromGroup
- Specified by:
doRemoveMessagesFromGroup
in classAbstractMessageGroupStore
-
getMessageFromGroup
Description copied from interface:MessageGroupStore
Retrieve aMessage
from a group by id. Returnnull
if message does not belong to the requested group.- Specified by:
getMessageFromGroup
in interfaceMessageGroupStore
- Parameters:
groupId
- The groupId for the group containing the message.messageId
- The message id.- Returns:
- message by id if it belongs to requested group.
-
doRemoveMessageFromGroupById
- Overrides:
doRemoveMessageFromGroupById
in classAbstractMessageGroupStore
-
doRemoveMessageGroup
- Specified by:
doRemoveMessageGroup
in classAbstractMessageGroupStore
-
doCompleteGroup
- Specified by:
doCompleteGroup
in classAbstractMessageGroupStore
-
doSetGroupCondition
- Specified by:
doSetGroupCondition
in classAbstractMessageGroupStore
-
doSetLastReleasedSequenceNumberForGroup
- Specified by:
doSetLastReleasedSequenceNumberForGroup
in classAbstractMessageGroupStore
-
doPollMessageFromGroup
- Specified by:
doPollMessageFromGroup
in classAbstractMessageGroupStore
-
getOneMessageFromGroup
Description copied from interface:MessageGroupStore
Return the oneMessage
fromMessageGroup
.- Specified by:
getOneMessageFromGroup
in interfaceMessageGroupStore
- Parameters:
groupId
- The group identifier.- Returns:
- the
Message
.
-
getMessagesForGroup
Description copied from interface:MessageGroupStore
Retrieve messages for the provided group id.- Specified by:
getMessagesForGroup
in interfaceMessageGroupStore
- Parameters:
groupId
- The group id to retrieve messages for.- Returns:
- the messages for group.
-
streamMessagesForGroup
Description copied from interface:MessageGroupStore
Return a stream for messages stored in the provided group. The persistent implementations return a Stream which has to be closed once fully processed (e.g. through a try-with-resources clause). By default, it streams a result ofMessageGroupStore.getMessagesForGroup(Object)
.- Specified by:
streamMessagesForGroup
in interfaceMessageGroupStore
- Parameters:
groupId
- the group id to retrieve messages.- Returns:
- the
Stream
for messages in this group.
-
iterator
- Specified by:
iterator
in interfaceIterable<MessageGroup>
- Specified by:
iterator
in interfaceMessageGroupStore
- Returns:
- The iterator of currently accumulated
MessageGroup
s.
-
getQuery
Replace patterns in the input to produce a valid SQL query. This implementation lazily initializes a simple map-based cache, only replacing the table prefix on the first access to a named query. Further, accesses will be resolved from the cache.- Parameters:
base
- the SQL query to be transformed- Returns:
- a transformed query with replacements
-
getJdbcOperations
To be used to get a reference to JdbcOperations in case this class is subclassed.- Returns:
- the JdbcOperations implementation
-
doPollForMessage
This method executes a call to the DB to get the oldest Message in the MessageGroup Override this method if need to. For example if your DB supports advanced function such as FIRST etc.- Parameters:
groupIdKey
- String representation of message group ID- Returns:
- a message; could be null if query produced no Messages
-