package com.couchbase.connect.kafka;

import com.couchbase.client.core.logging.LogRedaction;
import com.couchbase.client.core.logging.RedactionLevel;
import com.couchbase.client.core.util.CbStrings;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceTaskConfig;
import com.couchbase.connect.kafka.filter.Filter;
import com.couchbase.connect.kafka.handler.source.CollectionMetadata;
import com.couchbase.connect.kafka.handler.source.DocumentEvent;
import com.couchbase.connect.kafka.handler.source.SourceHandler;
import com.couchbase.connect.kafka.handler.source.SourceHandlerParams;
import com.couchbase.connect.kafka.handler.source.SourceRecordBuilder;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSourceTask.class */
public class CouchbaseSourceTask extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSourceTask.class);
    private static final long STOP_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
    private String connectorName;
    private CouchbaseReader couchbaseReader;
    private BlockingQueue<DocumentChange> queue;
    private BlockingQueue<Throwable> errorQueue;
    private String topic;
    private String bucket;
    private volatile boolean running;
    private Filter filter;
    private SourceHandler sourceHandler;
    private int batchSizeMax;
    private boolean connectorNameInOffsets;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.connect.kafka.CouchbaseSourceTask$1, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSourceTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$couchbase$client$core$logging$RedactionLevel = new int[RedactionLevel.values().length];

        static {
            try {
                $SwitchMap$com$couchbase$client$core$logging$RedactionLevel[RedactionLevel.FULL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$couchbase$client$core$logging$RedactionLevel[RedactionLevel.NONE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$couchbase$client$core$logging$RedactionLevel[RedactionLevel.PARTIAL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        this.connectorName = map.get("name");
        try {
            CouchbaseSourceTaskConfig couchbaseSourceTaskConfig = (CouchbaseSourceTaskConfig) ConfigHelper.parse(CouchbaseSourceTaskConfig.class, map);
            if (CbStrings.isNullOrEmpty(this.connectorName)) {
                throw new ConfigException("Connector must have a non-blank 'name' config property.");
            }
            LogRedaction.setRedactionLevel(couchbaseSourceTaskConfig.logRedaction());
            com.couchbase.client.dcp.core.logging.RedactionLevel.set(toDcp(couchbaseSourceTaskConfig.logRedaction()));
            this.filter = (Filter) Utils.newInstance(couchbaseSourceTaskConfig.eventFilter());
            this.sourceHandler = (SourceHandler) Utils.newInstance(couchbaseSourceTaskConfig.sourceHandler());
            this.topic = couchbaseSourceTaskConfig.topic();
            this.bucket = couchbaseSourceTaskConfig.bucket();
            this.connectorNameInOffsets = couchbaseSourceTaskConfig.connectorNameInOffsets();
            this.batchSizeMax = couchbaseSourceTaskConfig.batchSizeMax();
            Short[] boxedShortArray = toBoxedShortArray(couchbaseSourceTaskConfig.partitions());
            Map<Short, SeqnoAndVbucketUuid> readSourceOffsets = readSourceOffsets(boxedShortArray);
            this.running = true;
            this.queue = new LinkedBlockingQueue();
            this.errorQueue = new LinkedBlockingQueue(1);
            this.couchbaseReader = new CouchbaseReader(couchbaseSourceTaskConfig, this.connectorName, this.queue, this.errorQueue, boxedShortArray, readSourceOffsets);
            this.couchbaseReader.start();
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start CouchbaseSourceTask due to configuration error", e);
        }
    }

    private com.couchbase.client.dcp.core.logging.RedactionLevel toDcp(RedactionLevel redactionLevel) {
        switch (AnonymousClass1.$SwitchMap$com$couchbase$client$core$logging$RedactionLevel[redactionLevel.ordinal()]) {
            case 1:
                return com.couchbase.client.dcp.core.logging.RedactionLevel.FULL;
            case 2:
                return com.couchbase.client.dcp.core.logging.RedactionLevel.NONE;
            case 3:
                return com.couchbase.client.dcp.core.logging.RedactionLevel.PARTIAL;
            default:
                throw new IllegalArgumentException("Unrecognized redaction level: " + redactionLevel);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        checkErrorQueue();
        DocumentChange poll = this.queue.poll(1L, TimeUnit.SECONDS);
        if (poll == null) {
            LOGGER.debug("Poll returns 0 results");
            return null;
        }
        ArrayList arrayList = new ArrayList();
        try {
            arrayList.add(poll);
            this.queue.drainTo(arrayList, this.batchSizeMax - 1);
            List<SourceRecord> list = (List) arrayList.stream().map(documentChange -> {
                return DocumentEvent.create(documentChange, this.bucket);
            }).filter(documentEvent -> {
                return this.filter.pass(documentEvent);
            }).map(this::convertToSourceRecord).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
            LOGGER.info("Poll returns {} result(s) (filtered out {})", Integer.valueOf(list.size()), Integer.valueOf(arrayList.size() - list.size()));
            arrayList.forEach((v0) -> {
                v0.flowControlAck();
            });
            return list;
        } catch (Throwable th) {
            arrayList.forEach((v0) -> {
                v0.flowControlAck();
            });
            throw th;
        }
    }

    private void checkErrorQueue() throws ConnectException {
        Throwable poll = this.errorQueue.poll();
        if (poll != null) {
            throw new ConnectException(poll);
        }
    }

    private String getDefaultTopic(DocumentEvent documentEvent) {
        CollectionMetadata collectionMetadata = documentEvent.collectionMetadata();
        return this.topic.replace("${bucket}", this.bucket).replace("${scope}", collectionMetadata.scopeName()).replace("${collection}", collectionMetadata.collectionName()).replace("%", "_");
    }

    private SourceRecord convertToSourceRecord(DocumentEvent documentEvent) {
        String defaultTopic = getDefaultTopic(documentEvent);
        SourceRecordBuilder handle = this.sourceHandler.handle(new SourceHandlerParams(documentEvent, defaultTopic));
        if (handle == null) {
            return null;
        }
        return handle.build(sourcePartition(documentEvent.partition()), sourceOffset(documentEvent), defaultTopic);
    }

    public void stop() {
        this.running = false;
        if (this.couchbaseReader != null) {
            this.couchbaseReader.shutdown();
            try {
                this.couchbaseReader.join(STOP_TIMEOUT_MILLIS);
                if (this.couchbaseReader.isAlive()) {
                    LOGGER.error("Reader thread is still alive after shutdown request.");
                }
            } catch (InterruptedException e) {
                LOGGER.error("Interrupted while joining reader thread.", e);
            }
        }
    }

    private Map<Short, SeqnoAndVbucketUuid> readSourceOffsets(Short[] shArr) {
        HashMap hashMap = new HashMap();
        Map offsets = this.context.offsetStorageReader().offsets(sourcePartitions(shArr));
        LOGGER.info("Raw source offsets: {}", offsets);
        for (Map.Entry entry : offsets.entrySet()) {
            Map map = (Map) entry.getKey();
            Map map2 = (Map) entry.getValue();
            if (map2 == null) {
                LOGGER.warn("null offset value for {}", entry.getKey());
            } else {
                hashMap.put(Short.valueOf(Short.parseShort((String) map.get("partition"))), new SeqnoAndVbucketUuid(((Long) map2.get("bySeqno")).longValue(), (Long) map2.get("vbuuid")));
            }
        }
        LOGGER.debug("Partition to saved seqno: {}", hashMap);
        return hashMap;
    }

    private List<Map<String, Object>> sourcePartitions(Short[] shArr) {
        ArrayList arrayList = new ArrayList();
        for (Short sh : shArr) {
            Map<String, Object> sourcePartition = sourcePartition(sh.shortValue());
            System.out.println(sourcePartition);
            arrayList.add(sourcePartition);
        }
        return arrayList;
    }

    private Map<String, Object> sourcePartition(short s) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("bucket", this.bucket);
        hashMap.put("partition", String.valueOf((int) s));
        if (this.connectorNameInOffsets) {
            hashMap.put("connector", this.connectorName);
        }
        return hashMap;
    }

    private static Map<String, Object> sourceOffset(DocumentEvent documentEvent) {
        HashMap hashMap = new HashMap();
        hashMap.put("bySeqno", Long.valueOf(documentEvent.bySeqno()));
        hashMap.put("vbuuid", Long.valueOf(documentEvent.partitionUuid()));
        return hashMap;
    }

    private static Short[] toBoxedShortArray(Collection<String> collection) {
        return (Short[]) collection.stream().map(Short::valueOf).toArray(i -> {
            return new Short[i];
        });
    }
}
