package org.apache.pulsar.reactive.client.internal.adapter;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/pulsar/reactive/client/internal/adapter/ProducerCache.class */
public class ProducerCache implements ReactiveMessageSenderCache {
    private final ProducerCacheProvider cacheProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProducerCache(ProducerCacheProvider producerCacheProvider) {
        this.cacheProvider = producerCacheProvider;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> CompletableFuture<ProducerCacheEntry> createCacheEntry(Mono<Producer<T>> mono, Supplier<PublisherTransformer> supplier) {
        return mono.map(producer -> {
            return new ProducerCacheEntry(producer, supplier);
        }).toFuture();
    }

    private <T> Mono<ProducerCacheEntry> getProducerCacheEntry(ProducerCacheKey producerCacheKey, Mono<Producer<T>> mono, Supplier<PublisherTransformer> supplier) {
        return AdapterImplementationFactory.adaptPulsarFuture(() -> {
            return this.cacheProvider.getOrCreateCachedEntry(producerCacheKey, producerCacheKey2 -> {
                return createCacheEntry(mono, supplier);
            });
        }).flatMap(producerCacheEntry -> {
            return producerCacheEntry.recreateIfClosed(mono);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T, R> Mono<R> usingCachedProducer(ProducerCacheKey producerCacheKey, Mono<Producer<T>> mono, Supplier<PublisherTransformer> supplier, BiFunction<Producer<T>, PublisherTransformer, Mono<R>> biFunction) {
        return Mono.usingWhen(leaseCacheEntry(producerCacheKey, mono, supplier), producerCacheEntry -> {
            return (Mono) biFunction.apply(producerCacheEntry.getProducer(), producerCacheEntry.getProducerActionTransformer());
        }, this::returnCacheEntry);
    }

    private Mono<Object> returnCacheEntry(ProducerCacheEntry producerCacheEntry) {
        Objects.requireNonNull(producerCacheEntry);
        return Mono.fromRunnable(producerCacheEntry::releaseLease);
    }

    private <T> Mono<ProducerCacheEntry> leaseCacheEntry(ProducerCacheKey producerCacheKey, Mono<Producer<T>> mono, Supplier<PublisherTransformer> supplier) {
        return getProducerCacheEntry(producerCacheKey, mono, supplier).doOnNext((v0) -> {
            v0.activateLease();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T, R> Flux<R> usingCachedProducerMany(ProducerCacheKey producerCacheKey, Mono<Producer<T>> mono, Supplier<PublisherTransformer> supplier, BiFunction<Producer<T>, PublisherTransformer, Flux<R>> biFunction) {
        return Flux.usingWhen(leaseCacheEntry(producerCacheKey, mono, supplier), producerCacheEntry -> {
            return (Publisher) biFunction.apply(producerCacheEntry.getProducer(), producerCacheEntry.getProducerActionTransformer());
        }, this::returnCacheEntry);
    }

    public void close() throws Exception {
        if (this.cacheProvider != null) {
            this.cacheProvider.close();
        }
    }
}
