Class CachingPulsarProducerFactory<T>

java.lang.Object
org.springframework.pulsar.core.DefaultPulsarProducerFactory<T>
org.springframework.pulsar.core.CachingPulsarProducerFactory<T>
Type Parameters:
T - producer type.
All Implemented Interfaces:
DisposableBean, PulsarProducerFactory<T>

public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactory<T> implements DisposableBean
A PulsarProducerFactory that extends the default implementation by caching the created producers.

The created producer is wrapped in a proxy so that calls to Producer.close() do not actually close it. The actual close occurs when the producer is evicted from the cache or when DisposableBean.destroy() is invoked.

The proxied producer is cached in an LRU fashion and evicted when it has not been used within a configured time period.

Author:
Chris Bono, Alexander Preuß, Christophe Bornet
  • Constructor Details

    • CachingPulsarProducerFactory

      public CachingPulsarProducerFactory(org.apache.pulsar.client.api.PulsarClient pulsarClient, Map<String,Object> producerConfig, Duration cacheExpireAfterAccess, Long cacheMaximumSize, Integer cacheInitialCapacity)
      Construct a caching producer factory with the specified values for the cache configuration.
      Parameters:
      pulsarClient - the client used to create the producers
      producerConfig - the configuration to use when creating a producer
      cacheExpireAfterAccess - time period to expire unused entries in the cache
      cacheMaximumSize - maximum size of cache (entries)
      cacheInitialCapacity - the initial size of cache
  • Method Details

    • doCreateProducer

      protected org.apache.pulsar.client.api.Producer<T> doCreateProducer(org.apache.pulsar.client.api.Schema<T> schema, @Nullable String topic, @Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers)
      Description copied from class: DefaultPulsarProducerFactory
      Create the actual producer.
      Overrides:
      doCreateProducer in class DefaultPulsarProducerFactory<T>
      Parameters:
      schema - the schema of the messages to be sent
      topic - the topic the producer will send messages to or null to use the default topic
      encryptionKeys - the encryption keys used by the producer, replacing the default encryption keys or null to use the default encryption keys. Beware that ProducerBuilder only has ProducerBuilder.addEncryptionKey(java.lang.String) and doesn't have methods to replace the encryption keys.
      customizers - the optional list of customizers to apply to the producer builder
      Returns:
      the created producer
    • destroy

      public void destroy()
      Specified by:
      destroy in interface DisposableBean