Class PostgresChannelMessageTableSubscriber

java.lang.Object
org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber
All Implemented Interfaces:
Lifecycle, Phased, SmartLifecycle

public final class PostgresChannelMessageTableSubscriber extends Object implements SmartLifecycle
A subscriber for new messages being received by a Postgres database via a JdbcChannelMessageStore. This subscriber implementation is using Postgres' LISTEN/NOTIFY mechanism to allow for receiving push notifications for new messages what functions even if a message is written and read from different JVMs or JdbcChannelMessageStores.

Note that this subscriber requires an unshared PgConnection which remains open for any lifecycle. It is therefore recommended to execute a single subscriber for any JVM. For this reason, this subscriber is region-agnostic. To listen for messages for a given region and group id, use a PostgresChannelMessageTableSubscriber.Subscription and register it with this subscriber.

In order to function, the Postgres database that is used must define a trigger for sending notifications upon newly arrived messages. This trigger is defined in the schema-postgresql.sql file within this artifact but commented out.

Since:
6.0
Author:
Rafael Winterhalter, Artem Bilan, Igor Lovich, Christian Tzolov, Johannes Edmeier
  • Constructor Details

    • PostgresChannelMessageTableSubscriber

      public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier)
      Create a new subscriber using the JdbcChannelMessageStore.DEFAULT_TABLE_PREFIX.
      Parameters:
      connectionSupplier - The connection supplier for the targeted Postgres database.
    • PostgresChannelMessageTableSubscriber

      public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix)
      Create a new subscriber.
      Parameters:
      tablePrefix - The table prefix of the JdbcChannelMessageStore to subscribe to.
      connectionSupplier - The connection supplier for the targeted Postgres database.
  • Method Details

    • setTaskExecutor

      public void setTaskExecutor(AsyncTaskExecutor taskExecutor)
      Provide a managed AsyncTaskExecutor for Postgres listener daemon.
      Parameters:
      taskExecutor - the AsyncTaskExecutor to use.
      Since:
      6.2
    • setNotificationTimeout

      public void setNotificationTimeout(Duration notificationTimeout)
      Set the timeout for the notification polling. If for the specified duration no notificiation are received the underlying connection is closed and re-established. Setting a value of Duration.ZERO will disable the timeout and wait forever. This might cause problems in DB failover scenarios.
      Parameters:
      notificationTimeout - the timeout for the notification polling.
      Since:
      6.1.8
    • subscribe

      public boolean subscribe(PostgresChannelMessageTableSubscriber.Subscription subscription)
      Add a new subscription to this subscriber.
      Parameters:
      subscription - The subscription to register.
      Returns:
      true if the subscription was not already added.
    • unsubscribe

      public boolean unsubscribe(PostgresChannelMessageTableSubscriber.Subscription subscription)
      Remove a previous subscription from this subscriber.
      Parameters:
      subscription - The subscription to remove.
      Returns:
      true if the subscription was previously registered and is now removed.
    • start

      public void start()
      Specified by:
      start in interface Lifecycle
    • stop

      public void stop()
      Specified by:
      stop in interface Lifecycle
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle