package com.couchbase.connect.kafka;

import com.couchbase.client.core.logging.LogRedaction;
import com.couchbase.client.core.logging.RedactionLevel;
import com.couchbase.client.core.util.CbStrings;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceTaskConfig;
import com.couchbase.connect.kafka.filter.Filter;
import com.couchbase.connect.kafka.handler.source.CollectionMetadata;
import com.couchbase.connect.kafka.handler.source.CouchbaseSourceRecord;
import com.couchbase.connect.kafka.handler.source.DocumentEvent;
import com.couchbase.connect.kafka.handler.source.SourceHandler;
import com.couchbase.connect.kafka.handler.source.SourceHandlerParams;
import com.couchbase.connect.kafka.handler.source.SourceRecordBuilder;
import com.couchbase.connect.kafka.util.ScopeAndCollection;
import com.couchbase.connect.kafka.util.TopicMap;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSourceTask.class */
public class CouchbaseSourceTask extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSourceTask.class);
    private static final long STOP_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
    private String connectorName;
    private CouchbaseReader couchbaseReader;
    private BlockingQueue<DocumentChange> queue;
    private BlockingQueue<Throwable> errorQueue;
    private String defaultTopicTemplate;
    private Map<ScopeAndCollection, String> collectionToTopic;
    private String bucket;
    private Filter filter;
    private SourceHandler sourceHandler;
    private int batchSizeMax;
    private boolean connectorNameInOffsets;
    private boolean noValue;
    private SourceDocumentLifecycle lifecycle;
    private Optional<String> blackHoleTopic;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.connect.kafka.CouchbaseSourceTask$1, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSourceTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$couchbase$client$core$logging$RedactionLevel = new int[RedactionLevel.values().length];

        static {
            try {
                $SwitchMap$com$couchbase$client$core$logging$RedactionLevel[RedactionLevel.FULL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$couchbase$client$core$logging$RedactionLevel[RedactionLevel.NONE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$couchbase$client$core$logging$RedactionLevel[RedactionLevel.PARTIAL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSourceTask$ConversionResult.class */
    public static class ConversionResult {
        public final List<SourceRecord> records;
        public final int published;
        public final int dropped;

        public ConversionResult(List<SourceRecord> list, int i, int i2) {
            this.records = list;
            this.published = i;
            this.dropped = i2;
        }
    }

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        this.connectorName = map.get("name");
        try {
            CouchbaseSourceTaskConfig couchbaseSourceTaskConfig = (CouchbaseSourceTaskConfig) ConfigHelper.parse(CouchbaseSourceTaskConfig.class, map);
            if (CbStrings.isNullOrEmpty(this.connectorName)) {
                throw new ConfigException("Connector must have a non-blank 'name' config property.");
            }
            LogRedaction.setRedactionLevel(couchbaseSourceTaskConfig.logRedaction());
            com.couchbase.client.dcp.core.logging.RedactionLevel.set(toDcp(couchbaseSourceTaskConfig.logRedaction()));
            Map<String, String> unmodifiableMap = Collections.unmodifiableMap(map);
            this.lifecycle = SourceDocumentLifecycle.create(couchbaseSourceTaskConfig);
            this.filter = (Filter) Utils.newInstance(couchbaseSourceTaskConfig.eventFilter());
            this.filter.init(unmodifiableMap);
            this.sourceHandler = (SourceHandler) Utils.newInstance(couchbaseSourceTaskConfig.sourceHandler());
            this.sourceHandler.init(unmodifiableMap);
            this.blackHoleTopic = Optional.ofNullable(CbStrings.emptyToNull(couchbaseSourceTaskConfig.blackHoleTopic().trim()));
            this.defaultTopicTemplate = couchbaseSourceTaskConfig.topic();
            this.collectionToTopic = TopicMap.parseCollectionToTopic(couchbaseSourceTaskConfig.collectionToTopic());
            this.bucket = couchbaseSourceTaskConfig.bucket();
            this.connectorNameInOffsets = couchbaseSourceTaskConfig.connectorNameInOffsets();
            this.batchSizeMax = couchbaseSourceTaskConfig.batchSizeMax();
            this.noValue = couchbaseSourceTaskConfig.noValue();
            List<Integer> parseInts = parseInts(couchbaseSourceTaskConfig.partitions());
            Map<Integer, SeqnoAndVbucketUuid> readSourceOffsets = readSourceOffsets(parseInts);
            this.queue = new LinkedBlockingQueue();
            this.errorQueue = new LinkedBlockingQueue(1);
            this.couchbaseReader = new CouchbaseReader(couchbaseSourceTaskConfig, this.connectorName, this.queue, this.errorQueue, parseInts, readSourceOffsets, this.lifecycle);
            this.couchbaseReader.start();
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start CouchbaseSourceTask due to configuration error", e);
        }
    }

    private com.couchbase.client.dcp.core.logging.RedactionLevel toDcp(RedactionLevel redactionLevel) {
        switch (AnonymousClass1.$SwitchMap$com$couchbase$client$core$logging$RedactionLevel[redactionLevel.ordinal()]) {
            case 1:
                return com.couchbase.client.dcp.core.logging.RedactionLevel.FULL;
            case 2:
                return com.couchbase.client.dcp.core.logging.RedactionLevel.NONE;
            case 3:
                return com.couchbase.client.dcp.core.logging.RedactionLevel.PARTIAL;
            default:
                throw new IllegalArgumentException("Unrecognized redaction level: " + redactionLevel);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        checkErrorQueue();
        DocumentChange poll = this.queue.poll(1L, TimeUnit.SECONDS);
        if (poll == null) {
            LOGGER.debug("Poll returns 0 results");
            return null;
        }
        ArrayList arrayList = new ArrayList();
        try {
            arrayList.add(poll);
            this.queue.drainTo(arrayList, this.batchSizeMax - 1);
            ConversionResult convertToSourceRecords = convertToSourceRecords(arrayList);
            LOGGER.info("Poll returns {} result(s) (filtered out {})", Integer.valueOf(convertToSourceRecords.published), Integer.valueOf(convertToSourceRecords.dropped));
            List<SourceRecord> list = convertToSourceRecords.records;
            arrayList.forEach((v0) -> {
                v0.flowControlAck();
            });
            return list;
        } catch (Throwable th) {
            arrayList.forEach((v0) -> {
                v0.flowControlAck();
            });
            throw th;
        }
    }

    private boolean isSourceOffsetUpdate(SourceRecord sourceRecord) {
        return this.blackHoleTopic.isPresent() && this.blackHoleTopic.get().equals(sourceRecord.topic());
    }

    public void commitRecord(SourceRecord sourceRecord) {
        if (!(sourceRecord instanceof CouchbaseSourceRecord)) {
            LOGGER.warn("Committed a record we didn't create? Record key {}", sourceRecord.key());
            return;
        }
        CouchbaseSourceRecord couchbaseSourceRecord = (CouchbaseSourceRecord) sourceRecord;
        if (isSourceOffsetUpdate(couchbaseSourceRecord)) {
            this.lifecycle.logSourceOffsetUpdateCommittedToBlackHoleTopic(couchbaseSourceRecord);
        } else {
            this.lifecycle.logCommittedToKafkaTopic(couchbaseSourceRecord);
        }
    }

    private void checkErrorQueue() throws ConnectException {
        Throwable poll = this.errorQueue.poll();
        if (poll != null) {
            throw new ConnectException(poll);
        }
    }

    private String getDefaultTopic(DocumentEvent documentEvent) {
        CollectionMetadata collectionMetadata = documentEvent.collectionMetadata();
        return this.defaultTopicTemplate.replace("${bucket}", this.bucket).replace("${scope}", collectionMetadata.scopeName()).replace("${collection}", collectionMetadata.collectionName()).replace("%", "_");
    }

    private ConversionResult convertToSourceRecords(List<DocumentChange> list) {
        ArrayList arrayList = new ArrayList(list.size());
        int i = 0;
        for (DocumentChange documentChange : list) {
            DocumentEvent create = DocumentEvent.create(documentChange, this.bucket);
            if (this.filter.pass(create)) {
                CouchbaseSourceRecord convertToSourceRecord = convertToSourceRecord(documentChange, create);
                if (convertToSourceRecord == null) {
                    this.lifecycle.logSkippedBecauseHandlerSaysIgnore(documentChange);
                    i++;
                    this.blackHoleTopic.ifPresent(str -> {
                        arrayList.add(createSourceOffsetUpdateRecord(str, documentChange, create));
                    });
                } else {
                    this.lifecycle.logConvertedToKafkaRecord(documentChange, convertToSourceRecord);
                    arrayList.add(convertToSourceRecord);
                }
            } else {
                this.lifecycle.logSkippedBecauseFilterSaysIgnore(documentChange);
                i++;
                this.blackHoleTopic.ifPresent(str2 -> {
                    arrayList.add(createSourceOffsetUpdateRecord(str2, documentChange, create));
                });
            }
        }
        int size = arrayList.size();
        if (this.blackHoleTopic.isPresent()) {
            size -= i;
        }
        return new ConversionResult(arrayList, size, i);
    }

    private SourceRecord createSourceOffsetUpdateRecord(String str, DocumentChange documentChange, DocumentEvent documentEvent) {
        return new SourceRecordBuilder().key("ignored-" + documentChange.getVbucket()).build(documentChange, sourcePartition(documentEvent.partition()), sourceOffset(documentEvent), str);
    }

    private CouchbaseSourceRecord convertToSourceRecord(DocumentChange documentChange, DocumentEvent documentEvent) {
        String orDefault = this.collectionToTopic.getOrDefault(scopeAndCollection(documentEvent), getDefaultTopic(documentEvent));
        SourceRecordBuilder handle = this.sourceHandler.handle(new SourceHandlerParams(documentEvent, orDefault, this.noValue));
        if (handle == null) {
            return null;
        }
        return handle.build(documentChange, sourcePartition(documentEvent.partition()), sourceOffset(documentEvent), orDefault);
    }

    public void stop() {
        if (this.couchbaseReader != null) {
            this.couchbaseReader.shutdown();
            try {
                this.couchbaseReader.join(STOP_TIMEOUT_MILLIS);
                if (this.couchbaseReader.isAlive()) {
                    LOGGER.error("Reader thread is still alive after shutdown request.");
                }
            } catch (InterruptedException e) {
                LOGGER.error("Interrupted while joining reader thread.", e);
            }
        }
    }

    private Map<Integer, SeqnoAndVbucketUuid> readSourceOffsets(Collection<Integer> collection) {
        HashMap hashMap = new HashMap();
        Map offsets = this.context.offsetStorageReader().offsets(sourcePartitions(collection));
        LOGGER.debug("Raw source offsets: {}", offsets);
        for (Map.Entry entry : offsets.entrySet()) {
            Map map = (Map) entry.getKey();
            Map map2 = (Map) entry.getValue();
            if (map2 != null) {
                hashMap.put(Integer.valueOf(Integer.parseInt((String) map.get("partition"))), new SeqnoAndVbucketUuid(((Long) map2.get("bySeqno")).longValue(), (Long) map2.get("vbuuid")));
            }
        }
        LOGGER.debug("Partition to saved seqno: {}", hashMap);
        return hashMap;
    }

    private List<Map<String, Object>> sourcePartitions(Collection<Integer> collection) {
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(sourcePartition(it.next().intValue()));
        }
        return arrayList;
    }

    private Map<String, Object> sourcePartition(int i) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("bucket", this.bucket);
        hashMap.put("partition", String.valueOf(i));
        if (this.connectorNameInOffsets) {
            hashMap.put("connector", this.connectorName);
        }
        return hashMap;
    }

    private static Map<String, Object> sourceOffset(DocumentEvent documentEvent) {
        HashMap hashMap = new HashMap();
        hashMap.put("bySeqno", Long.valueOf(documentEvent.bySeqno()));
        hashMap.put("vbuuid", Long.valueOf(documentEvent.partitionUuid()));
        return hashMap;
    }

    private static List<Integer> parseInts(Collection<String> collection) {
        return (List) collection.stream().map(Integer::valueOf).collect(Collectors.toList());
    }

    private static ScopeAndCollection scopeAndCollection(DocumentEvent documentEvent) {
        CollectionMetadata collectionMetadata = documentEvent.collectionMetadata();
        return new ScopeAndCollection(collectionMetadata.scopeName(), collectionMetadata.collectionName());
    }
}
