package com.couchbase.client.core.transaction.cleanup;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.events.transaction.CleanupFailedEvent;
import com.couchbase.client.core.cnc.events.transaction.TransactionEvent;
import com.couchbase.client.core.error.transaction.internal.ThreadStopRequestedException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.transaction.config.CoreTransactionsConfig;
import com.couchbase.client.core.transaction.log.SimpleEventBusLogger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/transaction/cleanup/CoreTransactionsCleanup.class */
public class CoreTransactionsCleanup {
    public static String CATEGORY = TransactionEvent.DEFAULT_CATEGORY + ".cleanup";
    public static String CATEGORY_STATS = TransactionEvent.DEFAULT_CATEGORY + ".cleanup.stats";
    public static String CATEGORY_CLIENT_RECORD = TransactionEvent.DEFAULT_CATEGORY + ".clientrecord";
    public static String LOST_CATEGORY = TransactionEvent.DEFAULT_CATEGORY + ".cleanup.lost";
    public static String REGULAR_CATEGORY = TransactionEvent.DEFAULT_CATEGORY + ".cleanup.regular";
    private final Core core;
    private final CoreTransactionsConfig config;
    private final DelayQueue<CleanupRequest> cleanupQueue = new DelayQueue<>();
    private volatile boolean stop = false;
    private final CountDownLatch stopLatch;

    @Nullable
    private final LostCleanupDistributed lostCleanup;
    private final SimpleEventBusLogger LOGGER;
    private final SimpleEventBusLogger LOGGER_REGULAR;
    private final CleanerFactory cleanerFactory;

    public CoreTransactionsCleanup(Core core, CoreTransactionsConfig coreTransactionsConfig) {
        this.core = (Core) Objects.requireNonNull(core);
        this.config = (CoreTransactionsConfig) Objects.requireNonNull(coreTransactionsConfig);
        this.LOGGER = new SimpleEventBusLogger(core.context().environment().eventBus(), CATEGORY);
        this.LOGGER_REGULAR = new SimpleEventBusLogger(core.context().environment().eventBus(), REGULAR_CATEGORY);
        this.lostCleanup = coreTransactionsConfig.cleanupConfig().runLostAttemptsCleanupThread() ? new LostCleanupDistributed(core, coreTransactionsConfig, this::getCleaner) : null;
        int i = 0;
        this.cleanerFactory = coreTransactionsConfig.cleanerFactory();
        if (coreTransactionsConfig.cleanupConfig().runRegularAttemptsCleanupThread()) {
            runRegularAttemptsCleanupThread();
            i = 0 + 1;
        }
        this.stopLatch = new CountDownLatch(i);
        coreTransactionsConfig.metadataCollection().ifPresent(collectionIdentifier -> {
            core.openBucket(collectionIdentifier.bucket());
            addToCleanupSet(collectionIdentifier);
        });
        coreTransactionsConfig.cleanupConfig().cleanupSet().forEach(collectionIdentifier2 -> {
            core.openBucket(collectionIdentifier2.bucket());
            addToCleanupSet(collectionIdentifier2);
        });
    }

    public void addToCleanupSet(CollectionIdentifier collectionIdentifier) {
        if (this.lostCleanup != null) {
            this.lostCleanup.addToCleanupSet(collectionIdentifier);
        }
    }

    public Set<CollectionIdentifier> cleanupSet() {
        return this.lostCleanup != null ? this.lostCleanup.cleanupSet() : new HashSet();
    }

    void stopBackgroundProcesses(Duration duration) {
        this.stop = true;
        this.LOGGER.info(String.format("Waiting for %d regular background threads to exit", Long.valueOf(this.stopLatch.getCount())), new Object[0]);
        try {
            if (!this.stopLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                this.LOGGER.info("Background threads did not stop in expected time {}", duration);
            }
        } catch (InterruptedException e) {
            this.LOGGER.warn("Interrupted while waiting for background threads " + e, new Object[0]);
        }
        if (this.lostCleanup != null) {
            this.lostCleanup.shutdown(duration);
        }
        this.LOGGER.info("Background threads have exitted", new Object[0]);
    }

    private void runRegularAttemptsCleanupThread() {
        Objects.requireNonNull(this.LOGGER);
        this.LOGGER_REGULAR.debug("Starting background cleanup thread to find transactions from this client", new Object[0]);
        Flux.interval(Duration.ofMillis(100L), this.core.context().environment().transactionsSchedulers().schedulerCleanup()).flatMap(l -> {
            if (!this.stop) {
                return Mono.just(l);
            }
            this.LOGGER_REGULAR.info("Stopping background cleanup thread for transactions from this client", new Object[0]);
            this.stopLatch.countDown();
            return Mono.error(new ThreadStopRequestedException());
        }).flatMap(l2 -> {
            CleanupRequest poll;
            ArrayList arrayList = new ArrayList();
            do {
                poll = this.cleanupQueue.poll();
                if (poll != null) {
                    arrayList.add(poll);
                }
            } while (poll != null);
            return Flux.fromIterable(arrayList).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup());
        }).flatMap(cleanupRequest -> {
            return getCleaner().performCleanup(cleanupRequest, true, null).doOnSuccess(transactionCleanupAttemptEvent -> {
                this.LOGGER_REGULAR.debug(String.format("result of cleanup request %s: success=%s", cleanupRequest, Boolean.valueOf(transactionCleanupAttemptEvent.success())), new Object[0]);
            }).onErrorResume(th -> {
                this.core.context().environment().eventBus().publish(new CleanupFailedEvent(cleanupRequest, th));
                this.LOGGER_REGULAR.debug(String.format("error while handling cleanup request %s, leaving for lost cleanup: '%s'", cleanupRequest, th), new Object[0]);
                return Mono.empty();
            });
        }).retryWhen(Retry.allBut(ThreadStopRequestedException.class).exponentialBackoff(Duration.ofMillis(10L), Duration.ofMillis(2000L)).doOnRetry(retryContext -> {
            this.LOGGER_REGULAR.debug(String.format("retrying regular cleanup on error '%s'", retryContext.exception()), new Object[0]);
        }).retryMax(100000L).toReactorRetry()).subscribe(transactionCleanupAttemptEvent -> {
        }, th -> {
            if (th instanceof ThreadStopRequestedException) {
                return;
            }
            this.LOGGER_REGULAR.warn("regular cleanup thread ended with exception " + th, new Object[0]);
        }, () -> {
            this.LOGGER_REGULAR.warn("regular cleanup thread ending", new Object[0]);
        });
    }

    public TransactionsCleaner getCleaner() {
        return this.cleanerFactory.create(this.core);
    }

    public Optional<Integer> cleanupQueueLength() {
        return this.config.cleanupConfig().runRegularAttemptsCleanupThread() ? Optional.of(Integer.valueOf(this.cleanupQueue.size())) : Optional.empty();
    }

    public void add(CleanupRequest cleanupRequest) {
        this.cleanupQueue.add((DelayQueue<CleanupRequest>) cleanupRequest);
    }

    public Mono<Void> shutdown(Duration duration) {
        return Mono.fromRunnable(() -> {
            stopBackgroundProcesses(duration);
        });
    }
}
