package com.couchbase.client.core.transaction.cleanup;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.cnc.TracingIdentifiers;
import com.couchbase.client.core.cnc.events.transaction.TransactionCleanupAttemptEvent;
import com.couchbase.client.core.cnc.events.transaction.TransactionCleanupEndRunEvent;
import com.couchbase.client.core.cnc.events.transaction.TransactionCleanupStartRunEvent;
import com.couchbase.client.core.error.TimeoutException;
import com.couchbase.client.core.error.transaction.internal.ThreadStopRequestedException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.transaction.atr.ActiveTransactionRecordIds;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecord;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecordEntry;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecordUtil;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecords;
import com.couchbase.client.core.transaction.components.CasMode;
import com.couchbase.client.core.transaction.config.CoreTransactionsConfig;
import com.couchbase.client.core.transaction.support.SpanWrapper;
import com.couchbase.client.core.transaction.support.SpanWrapperUtil;
import com.couchbase.client.core.transaction.util.DebugUtil;
import com.couchbase.client.core.transaction.util.MonoBridge;
import de.lmu.ifi.dbs.elki.data.HierarchicalClassLabel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.compress.harmony.pack200.PackingOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/transaction/cleanup/LostCleanupDistributed.class */
public class LostCleanupDistributed {
    private final Core core;
    private final ClientRecord clientRecord;
    private final CoreTransactionsConfig config;
    private final Supplier<TransactionsCleaner> cleanerSupplier;
    private Disposable cleanupThreadLauncher;
    private final Duration actualCleanupWindow;
    private static final Logger LOGGER = LoggerFactory.getLogger(CoreTransactionsCleanup.LOST_CATEGORY);
    private static final Duration DEFAULT_SAFETY_MARGIN = Duration.ofMillis(1500);
    private volatile boolean stop = false;
    private final String clientUuid = UUID.randomUUID().toString();
    private final Set<CollectionIdentifier> cleanupSet = ConcurrentHashMap.newKeySet();
    private final Map<CollectionIdentifier, Disposable> actuallyBeingCleaned = new ConcurrentHashMap();
    private final String bp = "Client " + this.clientUuid.substring(0, 5);

    public LostCleanupDistributed(Core core, CoreTransactionsConfig coreTransactionsConfig, Supplier<TransactionsCleaner> supplier) {
        this.core = (Core) Objects.requireNonNull(core);
        this.clientRecord = coreTransactionsConfig.clientRecordFactory().create(core);
        this.config = (CoreTransactionsConfig) Objects.requireNonNull(coreTransactionsConfig);
        this.cleanerSupplier = (Supplier) Objects.requireNonNull(supplier);
        this.actualCleanupWindow = coreTransactionsConfig.cleanupConfig().cleanupWindow();
        start();
    }

    public void addToCleanupSet(CollectionIdentifier collectionIdentifier) {
        this.cleanupSet.add(collectionIdentifier);
    }

    public Set<CollectionIdentifier> cleanupSet() {
        return new HashSet(this.cleanupSet);
    }

    public Mono<Void> shutdown(Duration duration) {
        return Mono.fromCallable(() -> {
            HashSet hashSet;
            synchronized (this.actuallyBeingCleaned) {
                hashSet = new HashSet(this.actuallyBeingCleaned.keySet());
                this.stop = true;
                LOGGER.info("{} stopping lost cleanup process, {} threads running", this.bp, Integer.valueOf(this.actuallyBeingCleaned.keySet().size()));
                long nanoTime = System.nanoTime();
                while (true) {
                    if (Duration.ofNanos(System.nanoTime() - nanoTime).compareTo(duration) > 0) {
                        LOGGER.warn("Exceeded timeout of {}ms while waiting for transactions cleanup thread to finish", Long.valueOf(duration.toMillis()));
                        break;
                    }
                    if (this.actuallyBeingCleaned.isEmpty()) {
                        break;
                    }
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            return hashSet;
        }).flatMap(set -> {
            return this.clientRecord.removeClientFromClientRecord(this.clientUuid, set).then().onErrorResume(th -> {
                LOGGER.warn("{} failed to remove from cleanup set with err: {}", this.bp, th);
                return Mono.empty();
            });
        }).doOnTerminate(() -> {
            LOGGER.info("{} stopped lost cleanup process and removed client from client records", this.bp);
        });
    }

    private static List<String> atrsToHandle(int i, int i2, int i3) {
        List<String> allAtrs = ActiveTransactionRecordIds.allAtrs(i3);
        ArrayList arrayList = new ArrayList();
        int i4 = i;
        while (true) {
            int i5 = i4;
            if (i5 >= allAtrs.size()) {
                return arrayList;
            }
            arrayList.add(allAtrs.get(i5));
            i4 = i5 + i2;
        }
    }

    private RequestTracer tracer() {
        return this.core.context().coreResources().requestTracer();
    }

    public Flux<TransactionCleanupAttemptEvent> handleATRCleanup(String str, CollectionIdentifier collectionIdentifier, String str2, ActiveTransactionRecordStats activeTransactionRecordStats, Duration duration, SpanWrapper spanWrapper) {
        return Flux.defer(() -> {
            long nanoTime = System.nanoTime();
            AtomicLong atomicLong = new AtomicLong(0L);
            AtomicReference atomicReference = new AtomicReference(CasMode.UNKNOWN);
            SpanWrapper createOp = SpanWrapperUtil.createOp(null, tracer(), collectionIdentifier, str2, TracingIdentifiers.TRANSACTION_CLEANUP_ATR, spanWrapper);
            TransactionsCleaner transactionsCleaner = this.cleanerSupplier.get();
            return transactionsCleaner.hooks().beforeAtrGet.apply(str2).then(ActiveTransactionRecord.getAtr(this.core, collectionIdentifier, str2, this.core.context().environment().timeoutConfig().kvTimeout(), createOp)).flatMap(optional -> {
                atomicLong.set(System.nanoTime());
                if (!optional.isPresent()) {
                    return Mono.empty();
                }
                atomicReference.set(((ActiveTransactionRecords) optional.get()).casMode());
                return Mono.just((ActiveTransactionRecords) optional.get());
            }).doOnError(th -> {
                LOGGER.debug("{} Got error '{}' while getting ATR {}/", str, th, ActiveTransactionRecordUtil.getAtrDebug(collectionIdentifier, str2));
                activeTransactionRecordStats.errored = Optional.of(th);
            }).flatMapMany(activeTransactionRecords -> {
                activeTransactionRecordStats.numEntries = activeTransactionRecords.entries().size();
                activeTransactionRecordStats.exists = true;
                activeTransactionRecordStats.errored = Optional.empty();
                Collection<ActiveTransactionRecordEntry> collection = (Collection) activeTransactionRecords.entries().stream().filter(activeTransactionRecordEntry -> {
                    return activeTransactionRecordEntry.hasExpired(duration.toMillis());
                }).collect(Collectors.toList());
                activeTransactionRecordStats.expired = collection;
                createOp.attribute(TracingIdentifiers.ATTR_TRANSACTION_ATR_ENTRIES_COUNT, Integer.valueOf(activeTransactionRecordStats.numEntries));
                createOp.attribute(TracingIdentifiers.ATTR_TRANSACTION_ATR_ENTRIES_EXPIRED, Integer.valueOf(activeTransactionRecordStats.expired.size()));
                return Flux.fromIterable(collection).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup());
            }).concatMap(activeTransactionRecordEntry -> {
                LOGGER.trace("{} Found expired attempt {}, expires after {}, age {} (started {}, now {})", str, activeTransactionRecordEntry.attemptId(), activeTransactionRecordEntry.expiresAfterMillis().orElse(-1), Long.valueOf(activeTransactionRecordEntry.ageMillis()), activeTransactionRecordEntry.timestampStartMillis().orElse(0L), Long.valueOf(activeTransactionRecordEntry.cas() / PackingOptions.SEGMENT_LIMIT));
                activeTransactionRecordStats.expiredEntryCleanupTotalAttempts.incrementAndGet();
                return transactionsCleaner.performCleanup(CleanupRequest.fromAtrEntry(collectionIdentifier, activeTransactionRecordEntry), false, createOp).onErrorResume(th2 -> {
                    activeTransactionRecordStats.expiredEntryCleanupFailedAttempts.incrementAndGet();
                    return Mono.empty();
                });
            }).doOnError(th2 -> {
                createOp.finish(th2);
            }).doOnTerminate(() -> {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("{} processed ATR {} after {}µs ({} fetching ATR), CAS={}: {}", str, ActiveTransactionRecordUtil.getAtrDebug(collectionIdentifier, str2), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - nanoTime)), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(atomicLong.get() - nanoTime)), atomicReference.get(), activeTransactionRecordStats);
                }
                createOp.finish();
            }).onErrorResume(th3 -> {
                return Mono.empty();
            });
        });
    }

    private void start() {
        periodicallyCheckCleanupSet();
    }

    Mono<Void> createThreadForCollectionIfNeeded(CollectionIdentifier collectionIdentifier) {
        return Mono.defer(() -> {
            synchronized (this.actuallyBeingCleaned) {
                if (this.stop) {
                    return Mono.empty();
                }
                if (!this.actuallyBeingCleaned.containsKey(collectionIdentifier)) {
                    String redactableArgument = RedactableArgument.redactMeta(collectionIdentifier.bucket() + HierarchicalClassLabel.DEFAULT_SEPARATOR_STRING + collectionIdentifier.scope().orElse("-") + HierarchicalClassLabel.DEFAULT_SEPARATOR_STRING + collectionIdentifier.collection().orElse("-")).toString();
                    LOGGER.info("{} will start cleaning lost transactions on collection {}", this.bp, redactableArgument);
                    this.actuallyBeingCleaned.put(collectionIdentifier, perCollectionThread(collectionIdentifier).onErrorResume(th -> {
                        if (th instanceof ThreadStopRequestedException) {
                            return Mono.empty();
                        }
                        LOGGER.warn("{} {} lost transactions thread has ended on error {} (will be retried)", this.bp, redactableArgument, DebugUtil.dbg(th));
                        return Mono.empty();
                    }).doOnTerminate(() -> {
                        LOGGER.debug("{} {} lost transactions thread has ended", this.bp, redactableArgument);
                        this.actuallyBeingCleaned.remove(collectionIdentifier);
                    }).subscribe());
                }
                return Mono.empty();
            }
        });
    }

    private void periodicallyCheckCleanupSet() {
        this.cleanupThreadLauncher = Flux.interval(Duration.ZERO, Duration.ofSeconds(1L), this.core.context().environment().transactionsSchedulers().schedulerCleanup()).concatMap(l -> {
            return Flux.fromIterable(this.cleanupSet);
        }).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup()).concatMap(this::createThreadForCollectionIfNeeded).doOnCancel(() -> {
            LOGGER.info("{} has been told to cancel", this.bp);
        }).subscribe(r5 -> {
            LOGGER.warn("{} lost transactions cleanup thread(s) ending", this.bp);
        }, th -> {
            if (th instanceof ThreadStopRequestedException) {
                LOGGER.info("{} lost transactions cleanup told to stop", this.bp);
            } else {
                LOGGER.warn("{} lost transactions cleanup ended with exception " + th, this.bp);
            }
        });
    }

    private Mono<Void> perCollectionThread(CollectionIdentifier collectionIdentifier) {
        return Mono.defer(() -> {
            String str = "lost/" + RedactableArgument.redactMeta(collectionIdentifier.bucket() + HierarchicalClassLabel.DEFAULT_SEPARATOR_STRING + collectionIdentifier.scope().orElse("-") + HierarchicalClassLabel.DEFAULT_SEPARATOR_STRING + collectionIdentifier.collection().orElse("-")) + "/clientId=" + this.clientUuid.substring(0, 5);
            AtomicReference atomicReference = new AtomicReference();
            this.core.openBucket(collectionIdentifier.bucket());
            return Mono.fromRunnable(() -> {
                atomicReference.set(SpanWrapperUtil.createOp(null, tracer(), collectionIdentifier, null, TracingIdentifiers.TRANSACTION_CLEANUP_WINDOW, null).attribute(TracingIdentifiers.ATTR_TRANSACTION_CLEANUP_CLIENT_ID, this.clientUuid).attribute(TracingIdentifiers.ATTR_TRANSACTION_CLEANUP_WINDOW, Long.valueOf(this.config.cleanupConfig().cleanupWindow().toMillis())));
            }).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup()).then(this.clientRecord.processClient(this.clientUuid, collectionIdentifier, this.config, (SpanWrapper) atomicReference.get())).flatMap(clientRecordDetails -> {
                long nanoTime = System.nanoTime();
                HashMap hashMap = new HashMap();
                List<String> atrsToHandle = atrsToHandle(clientRecordDetails.indexOfThisClient(), clientRecordDetails.numActiveClients(), this.config.numAtrs());
                ((SpanWrapper) atomicReference.get()).attribute(TracingIdentifiers.ATTR_TRANSACTION_CLEANUP_NUM_ATRS, Integer.valueOf(atrsToHandle.size()));
                ((SpanWrapper) atomicReference.get()).attribute(TracingIdentifiers.ATTR_TRANSACTION_CLEANUP_NUM_ACTIVE, Integer.valueOf(clientRecordDetails.numActiveClients()));
                ((SpanWrapper) atomicReference.get()).attribute(TracingIdentifiers.ATTR_TRANSACTION_CLEANUP_NUM_EXPIRED, Integer.valueOf(clientRecordDetails.numExpiredClients()));
                long max = Math.max(1L, this.actualCleanupWindow.toNanos() / atrsToHandle.size());
                if (atrsToHandle.size() < this.config.numAtrs()) {
                    atrsToHandle.forEach(str2 -> {
                    });
                } else {
                    LOGGER.trace("{} owns all {} ATRs and will check them over next {}mills, checking an ATR every {}nanos", str, Integer.valueOf(this.config.numAtrs()), Long.valueOf(this.actualCleanupWindow.toMillis()), Long.valueOf(max));
                }
                TransactionCleanupStartRunEvent transactionCleanupStartRunEvent = new TransactionCleanupStartRunEvent(collectionIdentifier.bucket(), collectionIdentifier.scope().orElse("_default"), collectionIdentifier.collection().orElse("_default"), this.clientUuid, clientRecordDetails, this.actualCleanupWindow, atrsToHandle.size(), this.config.numAtrs(), Duration.ofMillis(max));
                this.core.context().environment().eventBus().publish(transactionCleanupStartRunEvent);
                return Flux.zip(Flux.fromIterable(atrsToHandle), Flux.interval(Duration.ofNanos(max))).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup()).flatMap(tuple2 -> {
                    String str3 = (String) tuple2.getT1();
                    LOGGER.trace("{} checking for lost txns in atr {}", str, ActiveTransactionRecordUtil.getAtrDebug(collectionIdentifier, str3));
                    ActiveTransactionRecordStats activeTransactionRecordStats = new ActiveTransactionRecordStats();
                    return new MonoBridge(checkIfThreadStopped(collectionIdentifier).thenMany(handleATRCleanup(str, collectionIdentifier, str3, activeTransactionRecordStats, DEFAULT_SAFETY_MARGIN, (SpanWrapper) atomicReference.get())).then(Mono.fromRunnable(() -> {
                        hashMap.put(str3, activeTransactionRecordStats);
                    })).thenReturn(str3), "", this, null).external();
                }).onErrorResume(th -> {
                    if (th instanceof TimeoutException) {
                        HashSet hashSet = new HashSet();
                        hashSet.add(RetryReason.KV_COLLECTION_OUTDATED);
                        if (((TimeoutException) th).context().requestContext().retryReasons().equals(hashSet)) {
                            this.cleanupSet.remove(collectionIdentifier);
                            LOGGER.info("{} stopping cleanup on collection {} as it seems to be deleted", str, collectionIdentifier);
                            return Mono.error(th);
                        }
                    }
                    if (th instanceof ThreadStopRequestedException) {
                        return Mono.error(th);
                    }
                    LOGGER.info("{} lost cleanup thread got error '{}', continuing", str, th);
                    return Mono.empty();
                }).then().thenReturn(Tuples.of(hashMap, transactionCleanupStartRunEvent, Long.valueOf(nanoTime)));
            }).doOnNext(tuple3 -> {
                this.core.context().environment().eventBus().publish(new TransactionCleanupEndRunEvent((TransactionCleanupStartRunEvent) tuple3.getT2(), (Map) tuple3.getT1(), Duration.ofNanos(System.nanoTime() - ((Long) tuple3.getT3()).longValue())));
            }).doOnNext(tuple32 -> {
                ((SpanWrapper) atomicReference.get()).finish();
            }).doOnError(th -> {
                ((SpanWrapper) atomicReference.get()).finish(th);
            }).retryWhen(Retry.allBut(ThreadStopRequestedException.class, AccessErrorException.class).exponentialBackoff(Duration.ofMillis(Math.min(1000L, this.config.cleanupConfig().cleanupWindow().toMillis())), this.config.cleanupConfig().cleanupWindow()).doOnRetry(retryContext -> {
                LOGGER.debug("{} retrying lost cleanup on error {} after {}", str, DebugUtil.dbg(retryContext.exception()), retryContext.backoff());
            }).toReactorRetry()).repeat().then();
        });
    }

    private Mono<Void> checkIfThreadStopped(CollectionIdentifier collectionIdentifier) {
        return Mono.defer(() -> {
            if (!this.stop) {
                return Mono.empty();
            }
            LOGGER.info("{} Stopping background cleanup thread for lost transactions on {}", this.bp, collectionIdentifier);
            return Mono.error(new ThreadStopRequestedException());
        });
    }
}
