Class JdbcMessageStore

All Implemented Interfaces:
Iterable<MessageGroup>, Aware, BeanClassLoaderAware, Lifecycle, Phased, SmartLifecycle, BasicMessageGroupStore, MessageGroupStore, MessageStore

public class JdbcMessageStore extends AbstractMessageGroupStore implements MessageStore, BeanClassLoaderAware, SmartLifecycle
Implementation of MessageStore using a relational database via JDBC. SQL scripts to create the necessary tables are packaged as org/springframework/integration/jdbc/schema-*.sql, where * is the target database type.

If you intend backing a MessageChannel using a JDBC-based Message Store, please consider using the channel-specific JdbcChannelMessageStore instead. This implementation is intended for correlation components (e.g. <aggregator>), <delayer> and similar.

This class implements SmartLifecycle and calls getMessageGroupCount() on start() to check if required tables are present in DB. The application context will fail to start if the table is not present. This check can be disabled via setCheckDatabaseOnStart(boolean).

Since:
2.0
Author:
Dave Syer, Oleg Zhurakousky, Matt Stine, Gunnar Hillert, Will Schipp, Gary Russell, Artem Bilan, Ngoc Nhan, Youbin Wu
  • Field Details

    • DEFAULT_TABLE_PREFIX

      public static final String DEFAULT_TABLE_PREFIX
      Default value for the table prefix property.
      See Also:
  • Constructor Details

  • Method Details

    • setBeanClassLoader

      public void setBeanClassLoader(ClassLoader classLoader)
      Specified by:
      setBeanClassLoader in interface BeanClassLoaderAware
    • setTablePrefix

      public void setTablePrefix(String tablePrefix)
      Public setter for the table prefix property. This will be prefixed to all the table names before queries are executed. Defaults to DEFAULT_TABLE_PREFIX.
      Parameters:
      tablePrefix - the tablePrefix to set
    • setRegion

      public void setRegion(String region)
      A unique grouping identifier for all messages persisted with this store. Using multiple regions allows the store to be partitioned (if necessary) for different purposes. Defaults to DEFAULT.
      Parameters:
      region - the region name to set
    • setSerializer

      public void setSerializer(Serializer<? super Message<?>> serializer)
      A converter for serializing messages to byte arrays for storage.
      Parameters:
      serializer - the serializer to set
    • setDeserializer

      public void setDeserializer(Deserializer<? extends Message<?>> deserializer)
      A converter for deserializing byte arrays to message.
      Parameters:
      deserializer - the deserializer to set
    • addAllowedPatterns

      public void addAllowedPatterns(String... patterns)
      Add patterns for packages/classes that are allowed to be deserialized. A class can be fully qualified or a wildcard '*' is allowed at the beginning or end of the class name. Examples: com.foo.*, *.MyClass.
      Parameters:
      patterns - the patterns.
      Since:
      5.4
    • setCheckDatabaseOnStart

      public void setCheckDatabaseOnStart(boolean checkDatabaseOnStart)
      The flag to perform a database check query on start or not.
      Parameters:
      checkDatabaseOnStart - false to not perform the database check.
      Since:
      6.2
    • isAutoStartup

      public boolean isAutoStartup()
      Specified by:
      isAutoStartup in interface SmartLifecycle
    • start

      public void start()
      Specified by:
      start in interface Lifecycle
    • stop

      public void stop()
      Specified by:
      stop in interface Lifecycle
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
    • removeMessage

      public Message<?> removeMessage(UUID id)
      Description copied from interface: MessageStore
      Remove the Message with the given id from the MessageStore, if present, and return it. If no Message with that id is present in the store, this will return null. If this method is implemented on a MessageGroupStore, the message is removed from the store only if no groups holding this message.
      Specified by:
      removeMessage in interface MessageStore
      Parameters:
      id - the message identifier.
      Returns:
      the message (if any).
    • getMessageCount

      @ManagedAttribute public long getMessageCount()
      Description copied from interface: MessageStore
      Optional attribute giving the number of messages in the store. Implementations may decline to respond by throwing an exception.
      Specified by:
      getMessageCount in interface MessageStore
      Returns:
      The number of messages.
    • getMessage

      public Message<?> getMessage(UUID id)
      Specified by:
      getMessage in interface MessageStore
      Parameters:
      id - The message identifier.
      Returns:
      The Message with the given id, or null if no Message with that id exists in the MessageStore.
    • getMessageMetadata

      public MessageMetadata getMessageMetadata(UUID id)
      Description copied from interface: MessageStore
      Return a MessageMetadata for the Message by provided id.
      Specified by:
      getMessageMetadata in interface MessageStore
      Parameters:
      id - The message identifier.
      Returns:
      The MessageMetadata with the given id, or null if no Message with that id exists in the MessageStore or the message has no metadata (legacy message from an earlier version).
    • addMessage

      public <T> Message<T> addMessage(Message<T> message)
      Description copied from interface: MessageStore
      Put the provided Message into the MessageStore. The store may need to mutate the message internally, and if it does then the return value can be different than the input. The id of the return value will be used as an index so that the MessageStore.getMessage(UUID) and MessageStore.removeMessage(UUID) behave properly. Since messages are immutable, putting the same message more than once is a no-op.
      Specified by:
      addMessage in interface MessageStore
      Type Parameters:
      T - The payload type.
      Parameters:
      message - The message.
      Returns:
      The message that was stored.
    • doAddMessagesToGroup

      protected void doAddMessagesToGroup(Object groupId, Message<?>... messages)
      Specified by:
      doAddMessagesToGroup in class AbstractMessageGroupStore
    • getMessageGroupCount

      @ManagedAttribute public int getMessageGroupCount()
      Description copied from interface: MessageGroupStore
      Optional attribute giving the number of message groups. Implementations may decline to respond by throwing an exception.
      Specified by:
      getMessageGroupCount in interface MessageGroupStore
      Overrides:
      getMessageGroupCount in class AbstractMessageGroupStore
      Returns:
      the number message groups
    • getMessageCountForAllMessageGroups

      @ManagedAttribute public int getMessageCountForAllMessageGroups()
      Description copied from interface: MessageGroupStore
      Optional attribute giving the number of messages in the store over all groups. Implementations may decline to respond by throwing an exception.
      Specified by:
      getMessageCountForAllMessageGroups in interface MessageGroupStore
      Overrides:
      getMessageCountForAllMessageGroups in class AbstractMessageGroupStore
      Returns:
      the number of messages
    • messageGroupSize

      @ManagedAttribute public int messageGroupSize(Object groupId)
      Description copied from interface: BasicMessageGroupStore
      Return the size of this MessageGroup.
      Specified by:
      messageGroupSize in interface BasicMessageGroupStore
      Parameters:
      groupId - The group identifier.
      Returns:
      The size.
    • getMessageGroup

      public MessageGroup getMessageGroup(Object groupId)
      Description copied from interface: BasicMessageGroupStore
      Return all Messages currently in the MessageStore that were stored using BasicMessageGroupStore.addMessageToGroup(Object, Message) with this group id.
      Specified by:
      getMessageGroup in interface BasicMessageGroupStore
      Parameters:
      groupId - The group identifier.
      Returns:
      A group of messages, empty if none exists for this key.
    • getGroupMetadata

      public MessageGroupMetadata getGroupMetadata(Object groupId)
      Description copied from interface: MessageGroupStore
      Obtain the group metadata without fetching any messages; must supply all other group properties; may include the id of the first message.
      Specified by:
      getGroupMetadata in interface MessageGroupStore
      Overrides:
      getGroupMetadata in class AbstractMessageGroupStore
      Parameters:
      groupId - The group id.
      Returns:
      The metadata.
    • doRemoveMessagesFromGroup

      protected void doRemoveMessagesFromGroup(Object groupId, Collection<Message<?>> messages)
      Specified by:
      doRemoveMessagesFromGroup in class AbstractMessageGroupStore
    • getMessageFromGroup

      @Nullable public Message<?> getMessageFromGroup(Object groupId, UUID messageId)
      Description copied from interface: MessageGroupStore
      Retrieve a Message from a group by id. Return null if message does not belong to the requested group.
      Specified by:
      getMessageFromGroup in interface MessageGroupStore
      Parameters:
      groupId - The groupId for the group containing the message.
      messageId - The message id.
      Returns:
      message by id if it belongs to requested group.
    • doRemoveMessageFromGroupById

      protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId)
      Overrides:
      doRemoveMessageFromGroupById in class AbstractMessageGroupStore
    • doRemoveMessageGroup

      protected void doRemoveMessageGroup(Object groupId)
      Specified by:
      doRemoveMessageGroup in class AbstractMessageGroupStore
    • doCompleteGroup

      protected void doCompleteGroup(Object groupId)
      Specified by:
      doCompleteGroup in class AbstractMessageGroupStore
    • doSetGroupCondition

      protected void doSetGroupCondition(Object groupId, String condition)
      Specified by:
      doSetGroupCondition in class AbstractMessageGroupStore
    • doSetLastReleasedSequenceNumberForGroup

      protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber)
      Specified by:
      doSetLastReleasedSequenceNumberForGroup in class AbstractMessageGroupStore
    • doPollMessageFromGroup

      protected Message<?> doPollMessageFromGroup(Object groupId)
      Specified by:
      doPollMessageFromGroup in class AbstractMessageGroupStore
    • getOneMessageFromGroup

      public Message<?> getOneMessageFromGroup(Object groupId)
      Description copied from interface: MessageGroupStore
      Return the one Message from MessageGroup.
      Specified by:
      getOneMessageFromGroup in interface MessageGroupStore
      Parameters:
      groupId - The group identifier.
      Returns:
      the Message.
    • getMessagesForGroup

      public Collection<Message<?>> getMessagesForGroup(Object groupId)
      Description copied from interface: MessageGroupStore
      Retrieve messages for the provided group id.
      Specified by:
      getMessagesForGroup in interface MessageGroupStore
      Parameters:
      groupId - The group id to retrieve messages for.
      Returns:
      the messages for group.
    • streamMessagesForGroup

      public Stream<Message<?>> streamMessagesForGroup(Object groupId)
      Description copied from interface: MessageGroupStore
      Return a stream for messages stored in the provided group. The persistent implementations return a Stream which has to be closed once fully processed (e.g. through a try-with-resources clause). By default, it streams a result of MessageGroupStore.getMessagesForGroup(Object).
      Specified by:
      streamMessagesForGroup in interface MessageGroupStore
      Parameters:
      groupId - the group id to retrieve messages.
      Returns:
      the Stream for messages in this group.
    • iterator

      public Iterator<MessageGroup> iterator()
      Specified by:
      iterator in interface Iterable<MessageGroup>
      Specified by:
      iterator in interface MessageGroupStore
      Returns:
      The iterator of currently accumulated MessageGroups.
    • getQuery

      protected String getQuery(org.springframework.integration.jdbc.store.JdbcMessageStore.Query base)
      Replace patterns in the input to produce a valid SQL query. This implementation lazily initializes a simple map-based cache, only replacing the table prefix on the first access to a named query. Further, accesses will be resolved from the cache.
      Parameters:
      base - the SQL query to be transformed
      Returns:
      a transformed query with replacements
    • getJdbcOperations

      protected JdbcOperations getJdbcOperations()
      To be used to get a reference to JdbcOperations in case this class is subclassed.
      Returns:
      the JdbcOperations implementation
    • doPollForMessage

      protected Message<?> doPollForMessage(String groupIdKey)
      This method executes a call to the DB to get the oldest Message in the MessageGroup Override this method if need to. For example if your DB supports advanced function such as FIRST etc.
      Parameters:
      groupIdKey - String representation of message group ID
      Returns:
      a message; could be null if query produced no Messages