package org.apache.pulsar.functions.instance;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.0.10.jar:org/apache/pulsar/functions/instance/ProducerCache.class */
public class ProducerCache implements Closeable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ProducerCache.class);
    private static final int PRODUCER_CACHE_TIMEOUT_SECONDS = Integer.parseInt(System.getenv().getOrDefault("PRODUCER_CACHE_TIMEOUT_SECONDS", "300"));
    private static final int PRODUCER_CACHE_MAX_SIZE = Integer.parseInt(System.getenv().getOrDefault("PRODUCER_CACHE_MAX_SIZE", "10000"));
    private static final int FLUSH_OR_CLOSE_TIMEOUT_SECONDS = 60;
    private final Cache<ProducerCacheKey, Producer<?>> cache;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = new CopyOnWriteArrayList<>();

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.0.10.jar:org/apache/pulsar/functions/instance/ProducerCache$CacheArea.class */
    public enum CacheArea {
        CONTEXT_CACHE,
        SINK_RECORD_CACHE
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.0.10.jar:org/apache/pulsar/functions/instance/ProducerCache$ProducerCacheKey.class */
    public static final class ProducerCacheKey extends Record {
        private final CacheArea cacheArea;
        private final String topic;
        private final Object additionalKey;

        ProducerCacheKey(CacheArea cacheArea, String str, Object obj) {
            this.cacheArea = cacheArea;
            this.topic = str;
            this.additionalKey = obj;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ProducerCacheKey.class), ProducerCacheKey.class, "cacheArea;topic;additionalKey", "FIELD:Lorg/apache/pulsar/functions/instance/ProducerCache$ProducerCacheKey;->cacheArea:Lorg/apache/pulsar/functions/instance/ProducerCache$CacheArea;", "FIELD:Lorg/apache/pulsar/functions/instance/ProducerCache$ProducerCacheKey;->topic:Ljava/lang/String;", "FIELD:Lorg/apache/pulsar/functions/instance/ProducerCache$ProducerCacheKey;->additionalKey:Ljava/lang/Object;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ProducerCacheKey.class), ProducerCacheKey.class, "cacheArea;topic;additionalKey", "FIELD:Lorg/apache/pulsar/functions/instance/ProducerCache$ProducerCacheKey;->cacheArea:Lorg/apache/pulsar/functions/instance/ProducerCache$CacheArea;", "FIELD:Lorg/apache/pulsar/functions/instance/ProducerCache$ProducerCacheKey;->topic:Ljava/lang/String;", "FIELD:Lorg/apache/pulsar/functions/instance/ProducerCache$ProducerCacheKey;->additionalKey:Ljava/lang/Object;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ProducerCacheKey.class, Object.class), ProducerCacheKey.class, "cacheArea;topic;additionalKey", "FIELD:Lorg/apache/pulsar/functions/instance/ProducerCache$ProducerCacheKey;->cacheArea:Lorg/apache/pulsar/functions/instance/ProducerCache$CacheArea;", "FIELD:Lorg/apache/pulsar/functions/instance/ProducerCache$ProducerCacheKey;->topic:Ljava/lang/String;", "FIELD:Lorg/apache/pulsar/functions/instance/ProducerCache$ProducerCacheKey;->additionalKey:Ljava/lang/Object;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public CacheArea cacheArea() {
            return this.cacheArea;
        }

        public String topic() {
            return this.topic;
        }

        public Object additionalKey() {
            return this.additionalKey;
        }
    }

    public ProducerCache() {
        Caffeine maximumWeight = Caffeine.newBuilder().scheduler(Scheduler.systemScheduler()).removalListener((producerCacheKey, producer, removalCause) -> {
            log.info("Closing producer for topic {}, cause {}", producerCacheKey.topic(), removalCause);
            CompletableFuture<Void> exceptionally = CompletableFuture.supplyAsync(() -> {
                return producer.flushAsync();
            }, (v0) -> {
                v0.run();
            }).orTimeout(60L, TimeUnit.SECONDS).exceptionally(th -> {
                Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                if (unwrapCompletionException instanceof PulsarClientException.AlreadyClosedException) {
                    log.error("Error flushing producer for topic {} due to AlreadyClosedException", producerCacheKey.topic());
                    return null;
                }
                log.error("Error flushing producer for topic {}", producerCacheKey.topic(), unwrapCompletionException);
                return null;
            }).thenCompose(completableFuture -> {
                return producer.closeAsync().orTimeout(60L, TimeUnit.SECONDS);
            }).exceptionally(th2 -> {
                Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th2);
                if (unwrapCompletionException instanceof PulsarClientException.AlreadyClosedException) {
                    log.error("Error closing producer for topic {} due to AlreadyClosedException", producerCacheKey.topic());
                    return null;
                }
                log.error("Error closing producer for topic {}", producerCacheKey.topic(), unwrapCompletionException);
                return null;
            });
            if (this.closed.get()) {
                this.closeFutures.add(exceptionally);
            }
        }).weigher((producerCacheKey2, producer2) -> {
            return Math.max(producer2.getNumOfPartitions(), 1);
        }).maximumWeight(PRODUCER_CACHE_MAX_SIZE);
        if (PRODUCER_CACHE_TIMEOUT_SECONDS > 0) {
            maximumWeight.expireAfterAccess(Duration.ofSeconds(PRODUCER_CACHE_TIMEOUT_SECONDS));
        }
        this.cache = maximumWeight.build();
    }

    public <T> Producer<T> getOrCreateProducer(CacheArea cacheArea, String str, Object obj, Callable<Producer<T>> callable) {
        if (this.closed.get()) {
            throw new IllegalStateException("ProducerCache is already closed");
        }
        return (Producer) this.cache.get(new ProducerCacheKey(cacheArea, str, obj), producerCacheKey -> {
            try {
                return (Producer) callable.call();
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e2) {
                throw new RuntimeException("Unable to create producer for topic '" + str + "'", e2);
            }
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.cache.invalidateAll();
            try {
                FutureUtil.waitForAll(this.closeFutures).get();
            } catch (InterruptedException | ExecutionException e) {
                log.warn("Failed to close producers", e);
            }
        }
    }

    @VisibleForTesting
    public boolean containsKey(CacheArea cacheArea, String str) {
        return containsKey(cacheArea, str, null);
    }

    @VisibleForTesting
    public boolean containsKey(CacheArea cacheArea, String str, Object obj) {
        return this.cache.getIfPresent(new ProducerCacheKey(cacheArea, str, obj)) != null;
    }
}
