package com.couchbase.connect.kafka;

import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.client.dcp.metrics.LogLevel;
import com.couchbase.connect.kafka.config.common.LoggingConfig;
import com.couchbase.connect.kafka.handler.source.CouchbaseSourceRecord;
import com.couchbase.connect.kafka.util.ConnectHelper;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/connect/kafka/SourceDocumentLifecycle.class */
public class SourceDocumentLifecycle {
    private static final Logger log = LoggerFactory.getLogger(SourceDocumentLifecycle.class);
    private final String taskId = ConnectHelper.getTaskIdFromLoggingContext().orElse("?");
    private final LogLevel logLevel;

    /* loaded from: input_file:com/couchbase/connect/kafka/SourceDocumentLifecycle$Milestone.class */
    public enum Milestone {
        RECEIVED_FROM_COUCHBASE,
        SKIPPED_BECAUSE_FILTER_SAYS_IGNORE,
        SKIPPED_BECAUSE_HANDLER_SAYS_IGNORE,
        CONVERTED_TO_KAFKA_RECORD,
        COMMITTED_TO_TOPIC
    }

    public boolean enabled() {
        return this.logLevel.isEnabled(log);
    }

    public static SourceDocumentLifecycle create(LoggingConfig loggingConfig) {
        return new SourceDocumentLifecycle(loggingConfig);
    }

    private SourceDocumentLifecycle(LoggingConfig loggingConfig) {
        this.logLevel = loggingConfig.logDocumentLifecycle() ? LogLevel.INFO : LogLevel.DEBUG;
        log.info("Logging document lifecycle milestones to this category at {} level", this.logLevel);
    }

    public void logReceivedFromCouchbase(DocumentChange documentChange) {
        if (enabled()) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("connectTaskId", this.taskId);
            linkedHashMap.put("revision", Long.valueOf(documentChange.getRevision()));
            linkedHashMap.put("type", documentChange.isMutation() ? "mutation" : "deletion");
            linkedHashMap.put("partition", Integer.valueOf(documentChange.getVbucket()));
            linkedHashMap.put("sequenceNumber", Long.valueOf(documentChange.getOffset().getSeqno()));
            linkedHashMap.put("sizeInBytes", Integer.valueOf(documentChange.getContent().length));
            linkedHashMap.put("usSinceCouchbaseChange(might be inaccurate before Couchbase 7)", Long.valueOf(documentChange.getTimestamp().until(Instant.now(), ChronoUnit.MICROS)));
            logMilestone(documentChange, Milestone.RECEIVED_FROM_COUCHBASE, linkedHashMap);
        }
    }

    public void logSkippedBecauseFilterSaysIgnore(DocumentChange documentChange) {
        logMilestone(documentChange, Milestone.SKIPPED_BECAUSE_FILTER_SAYS_IGNORE);
    }

    public void logSkippedBecauseHandlerSaysIgnore(DocumentChange documentChange) {
        logMilestone(documentChange, Milestone.SKIPPED_BECAUSE_HANDLER_SAYS_IGNORE);
    }

    public void logConvertedToKafkaRecord(DocumentChange documentChange, SourceRecord sourceRecord) {
        if (enabled()) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("topic", sourceRecord.topic());
            linkedHashMap.put("key", sourceRecord.key());
            linkedHashMap.put("kafkaPartition", sourceRecord.kafkaPartition());
            linkedHashMap.put("sourcePartition", sourceRecord.sourcePartition());
            linkedHashMap.put("sourceOffset", sourceRecord.sourceOffset());
            logMilestone(documentChange, Milestone.CONVERTED_TO_KAFKA_RECORD, linkedHashMap);
        }
    }

    public void logCommittedToKafkaTopic(CouchbaseSourceRecord couchbaseSourceRecord) {
        logMilestone(couchbaseSourceRecord, Milestone.COMMITTED_TO_TOPIC, Collections.emptyMap());
    }

    private void logMilestone(DocumentChange documentChange, Milestone milestone) {
        logMilestone(documentChange, milestone, Collections.emptyMap());
    }

    private void logMilestone(CouchbaseSourceRecord couchbaseSourceRecord, Milestone milestone, Map<String, Object> map) {
        if (enabled()) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("milestone", milestone);
            linkedHashMap.put("tracingToken", Long.valueOf(couchbaseSourceRecord.getTracingToken()));
            linkedHashMap.put("documentId", couchbaseSourceRecord.getCouchbaseDocumentId());
            linkedHashMap.putAll(map);
            doLog(linkedHashMap);
        }
    }

    private void logMilestone(DocumentChange documentChange, Milestone milestone, Map<String, Object> map) {
        if (enabled()) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("milestone", milestone);
            linkedHashMap.put("tracingToken", Long.valueOf(documentChange.getTracingToken()));
            linkedHashMap.put("documentId", documentChange.getQualifiedKey());
            linkedHashMap.putAll(map);
            doLog(linkedHashMap);
        }
    }

    private void doLog(Object obj) {
        try {
            this.logLevel.log(log, Mapper.encodeAsString(obj));
        } catch (Exception e) {
            this.logLevel.log(log, obj.toString());
        }
    }
}
