@ManagedResource public class JdbcMessageStore extends AbstractMessageGroupStore implements MessageStore, InitializingBean
MessageStore
using a relational database via JDBC. SQL scripts to create the necessary
tables are packaged as org/springframework/integration/jdbc/schema-*.sql
, where *
is the
target database type.
Notice: Starting with Spring Integration 3.0, this class will move to package:
org.springframework.integration.jdbc.store
.
If you intend backing a MessageChannel
using a JDBC-based Message Store,
please consider using the channel-specific JdbcChannelMessageStore
instead.MessageGroupStore.MessageGroupCallback
Modifier and Type | Field and Description |
---|---|
static String |
CREATED_DATE_KEY
The name of the message header that stores a timestamp for the time the message was inserted.
|
static int |
DEFAULT_LONG_STRING_LENGTH |
static String |
DEFAULT_TABLE_PREFIX
Default value for the table prefix property.
|
static String |
SAVED_KEY
The name of the message header that stores a flag to indicate that the message has been saved.
|
Constructor and Description |
---|
JdbcMessageStore()
Convenient constructor for configuration use.
|
JdbcMessageStore(DataSource dataSource)
Create a
MessageStore with all mandatory properties. |
Modifier and Type | Method and Description |
---|---|
<T> Message<T> |
addMessage(Message<T> message)
Put the provided Message into the MessageStore.
|
MessageGroup |
addMessageToGroup(Object groupId,
Message<?> message)
Store a message with an association to a group id.
|
void |
afterPropertiesSet() |
void |
completeGroup(Object groupId)
Completes this MessageGroup.
|
protected Message<?> |
doPollForMessage(String groupIdKey)
This method executes a call to the DB to get the oldest Message in the MessageGroup
Override this method if need to.
|
protected JdbcOperations |
getJdbcOperations()
To be used to get a reference to JdbcOperations
in case this class is subclassed
|
Message<?> |
getMessage(UUID id) |
long |
getMessageCount()
Optional attribute giving the number of messages in the store.
|
int |
getMessageCountForAllMessageGroups()
Optional attribute giving the number of messages in the store over all groups.
|
MessageGroup |
getMessageGroup(Object groupId)
Return all Messages currently in the MessageStore that were stored using
BasicMessageGroupStore.addMessageToGroup(Object, Message) with this group id. |
int |
getMessageGroupCount()
Optional attribute giving the number of message groups.
|
protected String |
getQuery(org.springframework.integration.jdbc.JdbcMessageStore.Query base)
Replace patterns in the input to produce a valid SQL query.
|
Iterator<MessageGroup> |
iterator() |
int |
messageGroupSize(Object groupId)
Returns the size of this MessageGroup.
|
Message<?> |
pollMessageFromGroup(Object groupId)
Polls Message from this
MessageGroup (in FIFO style if supported by the implementation)
while also removing the polled Message |
Message<?> |
removeMessage(UUID id)
Remove the Message with the given id from the MessageStore, if present, and return it.
|
MessageGroup |
removeMessageFromGroup(Object groupId,
Message<?> messageToRemove)
Persist a deletion on a single message from the group.
|
void |
removeMessageGroup(Object groupId)
Remove the message group with this id.
|
void |
setDataSource(DataSource dataSource)
The JDBC
DataSource to use when interacting with the database. |
void |
setDeserializer(Deserializer<? extends Message<?>> deserializer)
A converter for deserializing byte arrays to messages.
|
void |
setJdbcTemplate(JdbcOperations jdbcTemplate)
The
JdbcOperations to use when interacting with the database. |
void |
setLastReleasedSequenceNumberForGroup(Object groupId,
int sequenceNumber)
Allows you to set the sequence number of the last released Message.
|
void |
setLobHandler(LobHandler lobHandler)
Override the
LobHandler that is used to create and unpack large objects in SQL queries. |
void |
setRegion(String region)
A unique grouping identifier for all messages persisted with this store.
|
void |
setSerializer(Serializer<? super Message<?>> serializer)
A converter for serializing messages to byte arrays for storage.
|
void |
setTablePrefix(String tablePrefix)
Public setter for the table prefix property.
|
expireMessageGroups, getMessageBuilderFactory, isTimeoutOnIdle, registerMessageGroupExpiryCallback, setBeanFactory, setExpiryCallbacks, setTimeoutOnIdle
public static final String DEFAULT_TABLE_PREFIX
public static final int DEFAULT_LONG_STRING_LENGTH
public static final String SAVED_KEY
public static final String CREATED_DATE_KEY
public JdbcMessageStore()
public JdbcMessageStore(DataSource dataSource)
MessageStore
with all mandatory properties.dataSource
- a DataSource
public void setTablePrefix(String tablePrefix)
DEFAULT_TABLE_PREFIX
.tablePrefix
- the tablePrefix to setpublic void setRegion(String region)
DEFAULT
.region
- the region name to setpublic void setDataSource(DataSource dataSource)
DataSource
to use when interacting with the database. Either this property can be set or the
jdbcTemplate
.dataSource
- a DataSource
public void setJdbcTemplate(JdbcOperations jdbcTemplate)
JdbcOperations
to use when interacting with the database. Either this property can be set or the
dataSource
.jdbcTemplate
- a JdbcOperations
public void setLobHandler(LobHandler lobHandler)
LobHandler
that is used to create and unpack large objects in SQL queries. The default is
fine for almost all platforms, but some Oracle drivers require a native implementation.lobHandler
- a LobHandler
public void setSerializer(Serializer<? super Message<?>> serializer)
serializer
- the serializer to setpublic void setDeserializer(Deserializer<? extends Message<?>> deserializer)
deserializer
- the deserializer to setpublic void afterPropertiesSet() throws Exception
afterPropertiesSet
in interface InitializingBean
Exception
public Message<?> removeMessage(UUID id)
MessageStore
removeMessage
in interface MessageStore
id
- THe message identifier.@ManagedAttribute public long getMessageCount()
MessageStore
getMessageCount
in interface MessageStore
public Message<?> getMessage(UUID id)
getMessage
in interface MessageStore
id
- The message identifier.public <T> Message<T> addMessage(Message<T> message)
MessageStore
MessageStore.getMessage(UUID)
and MessageStore.removeMessage(UUID)
behave properly. Since messages are
immutable, putting the same message more than once is a no-op.addMessage
in interface MessageStore
T
- The payload type.message
- The message.public MessageGroup addMessageToGroup(Object groupId, Message<?> message)
BasicMessageGroupStore
addMessageToGroup
in interface BasicMessageGroupStore
groupId
- The group id to store the message under.message
- A message.@ManagedAttribute public int getMessageGroupCount()
MessageGroupStore
getMessageGroupCount
in interface MessageGroupStore
getMessageGroupCount
in class AbstractMessageGroupStore
@ManagedAttribute public int getMessageCountForAllMessageGroups()
MessageGroupStore
getMessageCountForAllMessageGroups
in interface MessageGroupStore
getMessageCountForAllMessageGroups
in class AbstractMessageGroupStore
@ManagedAttribute public int messageGroupSize(Object groupId)
BasicMessageGroupStore
messageGroupSize
in interface BasicMessageGroupStore
groupId
- The group identifier.public MessageGroup getMessageGroup(Object groupId)
BasicMessageGroupStore
BasicMessageGroupStore.addMessageToGroup(Object, Message)
with this group id.getMessageGroup
in interface BasicMessageGroupStore
groupId
- The group identifier.public MessageGroup removeMessageFromGroup(Object groupId, Message<?> messageToRemove)
MessageGroupStore
removeMessageFromGroup
in interface MessageGroupStore
groupId
- The groupId for the group containing the message.messageToRemove
- The message to be removed.public void removeMessageGroup(Object groupId)
BasicMessageGroupStore
removeMessageGroup
in interface BasicMessageGroupStore
groupId
- The id of the group to remove.public void completeGroup(Object groupId)
MessageGroupStore
completeGroup
in interface MessageGroupStore
groupId
- The group identifier.public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber)
MessageGroupStore
setLastReleasedSequenceNumberForGroup
in interface MessageGroupStore
groupId
- The group identifier.sequenceNumber
- The sequence number.public Message<?> pollMessageFromGroup(Object groupId)
BasicMessageGroupStore
MessageGroup
(in FIFO style if supported by the implementation)
while also removing the polled Message
pollMessageFromGroup
in interface BasicMessageGroupStore
groupId
- The group identifier.public Iterator<MessageGroup> iterator()
iterator
in interface Iterable<MessageGroup>
iterator
in interface MessageGroupStore
MessageGroup
s.protected String getQuery(org.springframework.integration.jdbc.JdbcMessageStore.Query base)
base
- the SQL query to be transformedprotected JdbcOperations getJdbcOperations()
protected Message<?> doPollForMessage(String groupIdKey)
groupIdKey
- String representation of message group ID