package com.couchbase.client.core.transaction.cleanup;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.api.kv.CoreSubdocGetResult;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.cnc.TracingIdentifiers;
import com.couchbase.client.core.deps.com.fasterxml.jackson.core.JsonProcessingException;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DecodingFailureException;
import com.couchbase.client.core.error.EncodingFailureException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.kv.CodecFlags;
import com.couchbase.client.core.msg.kv.SubdocCommandType;
import com.couchbase.client.core.msg.kv.SubdocGetRequest;
import com.couchbase.client.core.msg.kv.SubdocMutateRequest;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.transaction.CoreTransactionsReactive;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecord;
import com.couchbase.client.core.transaction.config.CoreTransactionsConfig;
import com.couchbase.client.core.transaction.error.internal.ErrorClass;
import com.couchbase.client.core.transaction.support.OptionsUtil;
import com.couchbase.client.core.transaction.support.SpanWrapper;
import com.couchbase.client.core.transaction.support.SpanWrapperUtil;
import com.couchbase.client.core.transaction.util.DebugUtil;
import com.couchbase.client.core.transaction.util.TransactionKVHandler;
import com.couchbase.client.core.util.Bytes;
import de.lmu.ifi.dbs.elki.data.HierarchicalClassLabel;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.compress.harmony.pack200.PackingOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/transaction/cleanup/ClientRecord.class */
public class ClientRecord {
    private final Core core;
    public static final String CLIENT_RECORD_DOC_ID = "_txn:client-record";
    private static final String FIELD_HEARTBEAT = "heartbeat_ms";
    private static final String FIELD_EXPIRES = "expires_ms";
    private static final String FIELD_NUM_ATRS = "num_atrs";
    private static final String FIELD_HOST = "host";
    private static final String FIELD_IMPLEMENTATION = "implementation";
    private static final String FIELD_VERSION = "version";
    private static final String FIELD_PROCESS_ID = "process_id";
    public static final String FIELD_RECORDS = "records";
    public static final String FIELD_CLIENTS = "clients";
    public static final String FIELD_OVERRIDE = "override";
    public static final String FIELD_OVERRIDE_ENABLED = "enabled";
    public static final String FIELD_OVERRIDE_EXPIRES = "expires";
    private static final int SAFETY_MARGIN_EXPIRY_MILLIS = 20000;
    private static final Logger LOGGER = LoggerFactory.getLogger(CoreTransactionsCleanup.CATEGORY_CLIENT_RECORD);
    private static final Duration TIMEOUT = Duration.ofMillis(500);
    private static final Duration BACKOFF_START = Duration.ofMillis(10);
    private static final Duration BACKOFF_END = Duration.ofMillis(250);

    public ClientRecord(Core core) {
        this.core = (Core) Objects.requireNonNull(core);
    }

    public Flux<Void> removeClientFromClientRecord(String str, Set<CollectionIdentifier> set) {
        return removeClientFromClientRecord(str, TIMEOUT, set);
    }

    public Flux<Void> removeClientFromClientRecord(String str, Duration duration, Set<CollectionIdentifier> set) {
        return Flux.fromIterable(set).subscribeOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup()).doOnNext(collectionIdentifier -> {
            LOGGER.info("{} removing from client record on collection {}", str, RedactableArgument.redactUser(collectionIdentifier));
        }).concatMap(collectionIdentifier2 -> {
            return beforeRemoveClient(this).then(TransactionKVHandler.mutateIn(this.core, collectionIdentifier2, CLIENT_RECORD_DOC_ID, mutatingTimeout(), false, false, false, false, false, 0L, CodecFlags.BINARY_COMMON_FLAGS, Optional.empty(), OptionsUtil.createClientContext("Cleaner::removeClientFromCleanupSet"), null, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "records.clients." + str, Bytes.EMPTY_BYTE_ARRAY, false, true, false, 0)))).onErrorResume(th -> {
                switch (ErrorClass.classify(th)) {
                    case FAIL_DOC_NOT_FOUND:
                        LOGGER.info("{}/{} remove skipped as client record does not exist", RedactableArgument.redactUser(collectionIdentifier2), str);
                        return Mono.empty();
                    case FAIL_PATH_NOT_FOUND:
                        LOGGER.info("{}/{} remove skipped as client record entry does not exist", RedactableArgument.redactUser(collectionIdentifier2), str);
                        return Mono.empty();
                    default:
                        LOGGER.info("{}/{} got error while removing client from client record: {}", RedactableArgument.redactUser(collectionIdentifier2), str, DebugUtil.dbg(th));
                        return Mono.error(th);
                }
            }).retryWhen(Retry.any().exponentialBackoff(BACKOFF_START, BACKOFF_END).doOnRetry(retryContext -> {
                LOGGER.info("{}/{} retrying removing client from record on error {}", RedactableArgument.redactUser(collectionIdentifier2), str, DebugUtil.dbg(retryContext.exception()));
            }).toReactorRetry()).timeout(duration).doOnNext(subdocMutateResponse -> {
                LOGGER.info("{}/{} removed from client record", RedactableArgument.redactUser(collectionIdentifier2), str);
            }).doOnError(th2 -> {
                LOGGER.info("got error while removing client record '{}'", String.valueOf(th2));
            }).then();
        });
    }

    private Duration mutatingTimeout() {
        return this.core.context().environment().timeoutConfig().kvDurableTimeout();
    }

    private Duration nonMutatingTimeout() {
        return this.core.context().environment().timeoutConfig().kvTimeout();
    }

    public static ClientRecordDetails parseClientRecord(CoreSubdocGetResult coreSubdocGetResult, String str) {
        try {
            JsonNode jsonNode = (JsonNode) Mapper.reader().readValue(coreSubdocGetResult.field(0).value(), JsonNode.class);
            ActiveTransactionRecord.ParsedHLC parsedHLC = new ActiveTransactionRecord.ParsedHLC((JsonNode) Mapper.reader().readValue(coreSubdocGetResult.field(1).value(), JsonNode.class));
            JsonNode jsonNode2 = jsonNode.get(FIELD_CLIENTS);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            Iterator<String> fieldNames = jsonNode2.fieldNames();
            while (fieldNames.hasNext()) {
                String next = fieldNames.next();
                JsonNode jsonNode3 = jsonNode2.get(next);
                if (((((parsedHLC.nowInNanos() / PackingOptions.SEGMENT_LIMIT) - ActiveTransactionRecord.parseMutationCAS(jsonNode3.get(FIELD_HEARTBEAT).textValue())) > ((long) jsonNode3.get(FIELD_EXPIRES).intValue()) ? 1 : (((parsedHLC.nowInNanos() / PackingOptions.SEGMENT_LIMIT) - ActiveTransactionRecord.parseMutationCAS(jsonNode3.get(FIELD_HEARTBEAT).textValue())) == ((long) jsonNode3.get(FIELD_EXPIRES).intValue()) ? 0 : -1)) >= 0) && !next.equals(str)) {
                    arrayList.add(next);
                } else {
                    arrayList2.add(next);
                }
            }
            if (!arrayList2.contains(str)) {
                arrayList2.add(str);
            }
            List list = (List) arrayList2.stream().sorted().collect(Collectors.toList());
            int indexOf = list.indexOf(str);
            int size = arrayList.size();
            int size2 = list.size();
            int i = size + size2;
            boolean has = jsonNode2.has(str);
            boolean z = false;
            long j = 0;
            JsonNode jsonNode4 = jsonNode.get(FIELD_OVERRIDE);
            if (jsonNode4 != null) {
                z = jsonNode4.get(FIELD_OVERRIDE_ENABLED).asBoolean();
                j = jsonNode4.get(FIELD_OVERRIDE_EXPIRES).asLong();
            }
            return new ClientRecordDetails(size2, indexOf, !has, arrayList, i, size, z, j, parsedHLC.nowInNanos());
        } catch (IOException e) {
            throw new DecodingFailureException(e);
        }
    }

    public Mono<CoreSubdocGetResult> getClientRecord(CollectionIdentifier collectionIdentifier, @Nullable SpanWrapper spanWrapper) {
        return TransactionKVHandler.lookupIn(this.core, collectionIdentifier, CLIENT_RECORD_DOC_ID, nonMutatingTimeout(), false, OptionsUtil.createClientContext("ClientRecord::getClientRecord"), spanWrapper, false, Arrays.asList(new SubdocGetRequest.Command(SubdocCommandType.GET, FIELD_RECORDS, true, 0), new SubdocGetRequest.Command(SubdocCommandType.GET, "$vbucket.HLC", true, 1)));
    }

    private RequestTracer tracer() {
        return this.core.context().coreResources().requestTracer();
    }

    public Mono<ClientRecordDetails> processClient(String str, CollectionIdentifier collectionIdentifier, CoreTransactionsConfig coreTransactionsConfig, @Nullable SpanWrapper spanWrapper) {
        return Mono.defer(() -> {
            SpanWrapper attribute = SpanWrapperUtil.createOp(null, tracer(), collectionIdentifier, CLIENT_RECORD_DOC_ID, TracingIdentifiers.TRANSACTION_CLEANUP_CLIENT, spanWrapper).attribute(TracingIdentifiers.ATTR_TRANSACTION_CLEANUP_CLIENT_ID, str);
            String str2 = collectionIdentifier.bucket() + "/" + collectionIdentifier.scope().orElse("-") + "/" + collectionIdentifier.collection().orElse("-") + "/" + str;
            return beforeGetRecord(this).then(getClientRecord(collectionIdentifier, attribute)).flatMap(coreSubdocGetResult -> {
                ClientRecordDetails parseClientRecord = parseClientRecord(coreSubdocGetResult, str);
                Logger logger = LOGGER;
                Object[] objArr = new Object[10];
                objArr[0] = str2;
                objArr[1] = Integer.valueOf(parseClientRecord.numExistingClients());
                objArr[2] = Integer.valueOf(parseClientRecord.numActiveClients());
                objArr[3] = Integer.valueOf(parseClientRecord.numExpiredClients());
                objArr[4] = Boolean.valueOf(!parseClientRecord.clientIsNew());
                objArr[5] = Integer.valueOf(parseClientRecord.indexOfThisClient());
                objArr[6] = Boolean.valueOf(parseClientRecord.overrideEnabled());
                objArr[7] = Long.valueOf(parseClientRecord.overrideExpires());
                objArr[8] = Long.valueOf(parseClientRecord.casNow());
                objArr[9] = Boolean.valueOf(parseClientRecord.overrideActive());
                logger.debug("{} found {} existing clients including this ({} active, {} expired), included this={}, index of this={}, override={enabled={},expires={},now={},active={}}", objArr);
                if (parseClientRecord.overrideActive()) {
                    return Mono.just(parseClientRecord);
                }
                ArrayList arrayList = new ArrayList();
                String str3 = "records.clients." + str;
                String str4 = "unavailable";
                try {
                    str4 = InetAddress.getLocalHost().getHostAddress();
                } catch (Throwable th) {
                }
                long j = 0;
                String name = ManagementFactory.getRuntimeMXBean().getName();
                try {
                    j = Long.parseLong(name.split("@")[0]);
                } catch (Throwable th2) {
                    LOGGER.debug("Discarding error {} while trying to parse PID {}", th2.getMessage(), name);
                }
                arrayList.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, str3, Mapper.encodeAsBytes(Mapper.createObjectNode().put(FIELD_EXPIRES, coreTransactionsConfig.cleanupConfig().cleanupWindow().toMillis() + 20000).put(FIELD_NUM_ATRS, coreTransactionsConfig.numAtrs()).put(FIELD_IMPLEMENTATION, "java").put(FIELD_VERSION, CoreTransactionsReactive.class.getPackage().getImplementationVersion()).put(FIELD_HOST, str4).put(FIELD_PROCESS_ID, j)), true, true, false, 0));
                try {
                    arrayList.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, str3 + HierarchicalClassLabel.DEFAULT_SEPARATOR_STRING + FIELD_HEARTBEAT, Mapper.writer().writeValueAsBytes("${Mutation.CAS}"), false, true, true, 1));
                    parseClientRecord.expiredClientIds().stream().limit((16 - arrayList.size()) - 1).forEach(str5 -> {
                        LOGGER.debug("{} removing expired client {}", str2, str5);
                        arrayList.add(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "records.clients." + str5, null, false, true, false, arrayList.size()));
                    });
                    arrayList.add(new SubdocMutateRequest.Command(SubdocCommandType.SET_DOC, "", new byte[]{0}, false, false, false, arrayList.size()));
                    return beforeUpdateRecord(this).then(TransactionKVHandler.mutateIn(this.core, collectionIdentifier, CLIENT_RECORD_DOC_ID, mutatingTimeout(), false, false, false, false, false, 0L, CodecFlags.BINARY_COMMON_FLAGS, Optional.empty(), OptionsUtil.createClientContext("ClientRecord::processClient"), attribute, arrayList)).thenReturn(parseClientRecord);
                } catch (JsonProcessingException e) {
                    throw new EncodingFailureException(e);
                }
            }).onErrorResume(th -> {
                ErrorClass classify = ErrorClass.classify(th);
                LOGGER.debug("{} got error processing client record: {}", str2, DebugUtil.dbg(th));
                return classify == ErrorClass.FAIL_DOC_NOT_FOUND ? createClientRecord(str, collectionIdentifier, attribute).then(processClient(str, collectionIdentifier, coreTransactionsConfig, spanWrapper)) : ((th instanceof CouchbaseException) && ((CouchbaseException) th).context() != null && ((CouchbaseException) th).context().responseStatus() == ResponseStatus.NO_ACCESS) ? Mono.error(new AccessErrorException()) : Mono.error(th);
            }).doOnError(th2 -> {
                attribute.finish(th2);
            }).doOnTerminate(() -> {
                attribute.finish();
            });
        });
    }

    private Mono<Void> createClientRecord(String str, CollectionIdentifier collectionIdentifier, SpanWrapper spanWrapper) {
        String str2 = collectionIdentifier.bucket() + "/" + collectionIdentifier.scope().orElse("-") + "/" + collectionIdentifier.collection().orElse("-") + "/" + str;
        return beforeCreateRecord(this).then(TransactionKVHandler.mutateIn(this.core, collectionIdentifier, CLIENT_RECORD_DOC_ID, mutatingTimeout(), true, false, false, false, false, 0L, CodecFlags.BINARY_COMMON_FLAGS, Optional.empty(), OptionsUtil.createClientContext("ClientRecord::createClientRecord"), spanWrapper, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, "records.clients", "{}".getBytes(StandardCharsets.UTF_8), false, true, false, 0), new SubdocMutateRequest.Command(SubdocCommandType.SET_DOC, "", new byte[]{0}, false, false, false, 1)))).doOnSubscribe(subscription -> {
            LOGGER.debug("{} found client record does not exist, creating and retrying", str2);
        }).onErrorResume(th -> {
            if (ErrorClass.FAIL_DOC_ALREADY_EXISTS == ErrorClass.classify(th)) {
                LOGGER.debug("{} found client record exists after retry, another client must have created it, continuing", str2);
                return Mono.empty();
            }
            if ((th instanceof CouchbaseException) && ((CouchbaseException) th).context().responseStatus() == ResponseStatus.NO_ACCESS) {
                return Mono.error(new AccessErrorException());
            }
            LOGGER.info("got error while creating client record '{}'", String.valueOf(th));
            return Mono.error(th);
        }).then();
    }

    protected Mono<Integer> beforeCreateRecord(ClientRecord clientRecord) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeRemoveClient(ClientRecord clientRecord) {
        return Mono.just(1);
    }

    @Deprecated
    protected Mono<Integer> beforeUpdateCAS(ClientRecord clientRecord) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeGetRecord(ClientRecord clientRecord) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeUpdateRecord(ClientRecord clientRecord) {
        return Mono.just(1);
    }
}
