Class CachingPulsarProducerFactory<T>

java.lang.Object
org.springframework.pulsar.core.DefaultPulsarProducerFactory<T>
org.springframework.pulsar.core.CachingPulsarProducerFactory<T>
Type Parameters:
T - producer type.
All Implemented Interfaces:
DisposableBean, Lifecycle, Phased, SmartLifecycle, PulsarProducerFactory<T>

public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactory<T>
A PulsarProducerFactory that extends the default implementation by caching the created producers.

The created producer is wrapped in a proxy so that calls to Producer.close() do not actually close it. The actual close occurs when the producer is evicted from the cache or when DisposableBean.destroy() is invoked.

The proxied producer is cached in an LRU fashion and evicted when it has not been used within a configured time period.

Author:
Chris Bono, Alexander Preuß, Christophe Bornet
  • Constructor Details

    • CachingPulsarProducerFactory

      public CachingPulsarProducerFactory(org.apache.pulsar.client.api.PulsarClient pulsarClient, @Nullable String defaultTopic, List<ProducerBuilderCustomizer<T>> defaultConfigCustomizers, TopicResolver topicResolver, Duration cacheExpireAfterAccess, Long cacheMaximumSize, Integer cacheInitialCapacity)
      Construct a caching producer factory with the specified values for the cache configuration.
      Parameters:
      pulsarClient - the client used to create the producers
      defaultTopic - the default topic to use for the producers
      defaultConfigCustomizers - the optional list of customizers to apply to the created producers
      topicResolver - the topic resolver to use
      cacheExpireAfterAccess - time period to expire unused entries in the cache
      cacheMaximumSize - maximum size of cache (entries)
      cacheInitialCapacity - the initial size of cache
  • Method Details

    • doCreateProducer

      protected org.apache.pulsar.client.api.Producer<T> doCreateProducer(org.apache.pulsar.client.api.Schema<T> schema, @Nullable String topic, @Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers)
      Description copied from class: DefaultPulsarProducerFactory
      Create the actual producer.
      Overrides:
      doCreateProducer in class DefaultPulsarProducerFactory<T>
      Parameters:
      schema - the schema of the messages to be sent
      topic - the topic the producer will send messages to or null to use the default topic
      encryptionKeys - the encryption keys used by the producer, replacing the default encryption keys or null to use the default encryption keys. Beware that ProducerBuilder only has ProducerBuilder.addEncryptionKey(java.lang.String) and doesn't have methods to replace the encryption keys.
      customizers - the optional list of customizers to apply to the producer builder
      Returns:
      the created producer
    • getPhase

      public int getPhase()
      Return the phase that this lifecycle object is supposed to run in.

      Because this object depends on the restartable client, it uses a phase slightly larger than the one used by the restartable client. This ensures that it starts after and stops before the restartable client.

      Specified by:
      getPhase in interface Phased
      Specified by:
      getPhase in interface SmartLifecycle
      Returns:
      the phase to execute in (just after the restartable client)
      See Also:
    • currentState

      public AtomicReference<org.springframework.pulsar.core.RestartableComponentSupport.State> currentState()
    • logger

      public LogAccessor logger()
    • doStop

      public void doStop()
    • isRunning

      default boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
    • start

      default void start()
      Specified by:
      start in interface Lifecycle
    • doStart

      default void doStart()
      Callback invoked during startup - default implementation does nothing.
    • stop

      default void stop()
      Specified by:
      stop in interface Lifecycle
    • destroy

      default void destroy()
      Specified by:
      destroy in interface DisposableBean