package com.couchbase.connect.kafka;

import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.logging.RedactionLevel;
import com.couchbase.client.dcp.config.CompressionMode;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.connect.kafka.converter.Converter;
import com.couchbase.connect.kafka.dcp.Event;
import com.couchbase.connect.kafka.dcp.Snapshot;
import com.couchbase.connect.kafka.filter.Filter;
import com.couchbase.connect.kafka.handler.source.CouchbaseSourceRecord;
import com.couchbase.connect.kafka.handler.source.DocumentEvent;
import com.couchbase.connect.kafka.handler.source.LegacySourceHandlerAdapter;
import com.couchbase.connect.kafka.handler.source.SourceHandler;
import com.couchbase.connect.kafka.handler.source.SourceHandlerParams;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.config.DurationParser;
import com.couchbase.connect.kafka.util.config.Password;
import com.couchbase.connect.kafka.util.config.SizeParser;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSourceTask.class */
public class CouchbaseSourceTask extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSourceTask.class);
    private static final long MAX_TIMEOUT = 10000;
    private CouchbaseSourceConnectorConfig config;
    private Map<String, String> configProperties;
    private CouchbaseReader couchbaseReader;
    private BlockingQueue<Event> queue;
    private BlockingQueue<Throwable> errorQueue;
    private String topic;
    private String bucket;
    private volatile boolean running;
    private Filter filter;
    private SourceHandler sourceHandler;
    private int batchSizeMax;
    private boolean connectorNameInOffsets;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        try {
            this.configProperties = map;
            this.config = new CouchbaseSourceTaskConfig(this.configProperties);
            CouchbaseLoggerFactory.setRedactionLevel((RedactionLevel) this.config.getEnum(RedactionLevel.class, CouchbaseSourceConnectorConfig.LOG_REDACTION_CONFIG));
            this.filter = createFilter(this.config.getString(CouchbaseSourceConnectorConfig.EVENT_FILTER_CLASS_CONFIG));
            this.sourceHandler = createHandler(this.config.getString(CouchbaseSourceConnectorConfig.DCP_MESSAGE_CONVERTER_CLASS_CONFIG));
            this.topic = this.config.getString(CouchbaseSourceConnectorConfig.TOPIC_NAME_CONFIG);
            this.bucket = this.config.getString(CouchbaseSourceConnectorConfig.CONNECTION_BUCKET_CONFIG);
            this.connectorNameInOffsets = this.config.getBoolean(CouchbaseSourceConnectorConfig.COMPAT_NAMES_CONFIG).booleanValue();
            String username = this.config.getUsername();
            String str = Password.CONNECTION.get(this.config);
            List list = this.config.getList(CouchbaseSourceConnectorConfig.CONNECTION_CLUSTER_ADDRESS_CONFIG);
            boolean booleanValue = this.config.getBoolean(CouchbaseSourceConnectorConfig.USE_SNAPSHOTS_CONFIG).booleanValue();
            boolean booleanValue2 = this.config.getBoolean(CouchbaseSourceConnectorConfig.CONNECTION_SSL_ENABLED_CONFIG).booleanValue();
            String string = this.config.getString(CouchbaseSourceConnectorConfig.CONNECTION_SSL_KEYSTORE_LOCATION_CONFIG);
            String str2 = Password.SSL_KEYSTORE.get(this.config);
            this.batchSizeMax = this.config.getInt(CouchbaseSourceConnectorConfig.BATCH_SIZE_MAX_CONFIG).intValue();
            StreamFrom streamFrom = (StreamFrom) this.config.getEnum(StreamFrom.class, CouchbaseSourceConnectorConfig.STREAM_FROM_CONFIG);
            CompressionMode compressionMode = (CompressionMode) this.config.getEnum(CompressionMode.class, CouchbaseSourceConnectorConfig.COMPRESSION_CONFIG);
            long parseDuration = DurationParser.parseDuration(this.config.getString(CouchbaseSourceConnectorConfig.PERSISTENCE_POLLING_INTERVAL_CONFIG), TimeUnit.MILLISECONDS);
            int min = (int) Math.min(2147483647L, SizeParser.parseSizeBytes(this.config.getString(CouchbaseSourceConnectorConfig.FLOW_CONTROL_BUFFER_CONFIG)));
            long longValue = this.config.getLong(CouchbaseSourceConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG).longValue();
            Short[] boxedShortArray = toBoxedShortArray(this.config.getList(CouchbaseSourceTaskConfig.PARTITIONS_CONFIG));
            Map<Short, Long> readSourceOffsets = readSourceOffsets(boxedShortArray);
            this.running = true;
            this.queue = new LinkedBlockingQueue();
            this.errorQueue = new LinkedBlockingQueue(1);
            this.couchbaseReader = new CouchbaseReader(list, this.bucket, username, str, longValue, this.queue, this.errorQueue, boxedShortArray, readSourceOffsets, streamFrom, booleanValue, booleanValue2, string, str2, compressionMode, parseDuration, min);
            this.couchbaseReader.start();
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start CouchbaseSourceTask due to configuration error", e);
        }
    }

    private SourceHandler createHandler(String str) {
        try {
            try {
                return (SourceHandler) Utils.newInstance(str, SourceHandler.class);
            } catch (ClassCastException e) {
                LegacySourceHandlerAdapter legacySourceHandlerAdapter = new LegacySourceHandlerAdapter((Converter) Utils.newInstance(str, Converter.class));
                LOGGER.warn("Converter class {} implements deprecated {}. Please update the converter to extend {} instead.", new Object[]{str, Converter.class, SourceHandler.class});
                return legacySourceHandlerAdapter;
            }
        } catch (ClassNotFoundException e2) {
            throw new ConnectException("Couldn't create message handler", e2);
        }
    }

    private Filter createFilter(String str) {
        if (str == null || "".equals(str)) {
            return null;
        }
        try {
            return (Filter) Utils.newInstance(str, Filter.class);
        } catch (ClassNotFoundException e) {
            throw new ConnectException("Couldn't create filter in CouchbaseSourceTask due to an error", e);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        SourceRecord convert;
        LinkedList linkedList = new LinkedList();
        int i = this.batchSizeMax;
        while (this.running) {
            Event poll = this.queue.poll(100L, TimeUnit.MILLISECONDS);
            if (poll != null) {
                try {
                    for (ByteBuf byteBuf : poll) {
                        if ((this.filter == null || this.filter.pass(byteBuf)) && (convert = convert(byteBuf)) != null) {
                            linkedList.add(convert);
                        }
                    }
                    poll.ack();
                    i--;
                    releaseAll(poll);
                } catch (Throwable th) {
                    releaseAll(poll);
                    throw th;
                }
            }
            if (!linkedList.isEmpty() && (i == 0 || poll == null || (poll instanceof Snapshot))) {
                LOGGER.info("Poll returns {} result(s)", Integer.valueOf(linkedList.size()));
                return linkedList;
            }
            Throwable poll2 = this.errorQueue.poll();
            if (poll2 != null) {
                throw new ConnectException(poll2);
            }
        }
        return linkedList;
    }

    public SourceRecord convert(ByteBuf byteBuf) {
        DocumentEvent create = DocumentEvent.create(byteBuf, this.bucket, this.couchbaseReader.getVBucketUuid(MessageUtil.getVbucket(byteBuf)));
        CouchbaseSourceRecord handle = this.sourceHandler.handle(new SourceHandlerParams(create, this.topic));
        if (handle == null) {
            return null;
        }
        return new SourceRecord(sourcePartition(create.vBucket()), sourceOffset(create.bySeqno()), handle.topic() == null ? this.topic : handle.topic(), handle.kafkaPartition(), handle.keySchema(), handle.key(), handle.valueSchema(), handle.value(), handle.timestamp());
    }

    public void stop() {
        this.running = false;
        this.couchbaseReader.shutdown();
        try {
            this.couchbaseReader.join(10000L);
            if (this.couchbaseReader.isAlive()) {
                LOGGER.error("Reader thread is still alive after shutdown request.");
            }
        } catch (InterruptedException e) {
            LOGGER.error("Interrupted while joining reader thread.", e);
        }
        LOGGER.info("Releasing unconsumed events: {}", Integer.valueOf(this.queue.size()));
        Iterator it = this.queue.iterator();
        while (it.hasNext()) {
            releaseAll((Event) it.next());
        }
    }

    private void releaseAll(Iterable<ByteBuf> iterable) {
        RuntimeException runtimeException = null;
        for (ByteBuf byteBuf : iterable) {
            try {
                byteBuf.release();
            } catch (RuntimeException e) {
                LOGGER.warn("Failed to release buffer {}", byteBuf, e);
                runtimeException = e;
            }
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    private Map<Short, Long> readSourceOffsets(Short[] shArr) {
        HashMap hashMap = new HashMap();
        Map offsets = this.context.offsetStorageReader().offsets(sourcePartitions(shArr));
        LOGGER.debug("Raw source offsets: {}", offsets);
        for (Map.Entry entry : offsets.entrySet()) {
            Map map = (Map) entry.getKey();
            Map map2 = (Map) entry.getValue();
            if (map2 != null) {
                hashMap.put(Short.valueOf((String) map.get("partition")), (Long) map2.get("bySeqno"));
            }
        }
        LOGGER.debug("Partition to saved seqno: {}", hashMap);
        return hashMap;
    }

    private List<Map<String, Object>> sourcePartitions(Short[] shArr) {
        ArrayList arrayList = new ArrayList();
        for (Short sh : shArr) {
            arrayList.add(sourcePartition(sh.shortValue()));
        }
        return arrayList;
    }

    private Map<String, Object> sourcePartition(short s) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("bucket", this.bucket);
        hashMap.put("partition", String.valueOf((int) s));
        if (this.connectorNameInOffsets) {
            hashMap.put("connector", this.config.getConnectorName());
        }
        return hashMap;
    }

    private static Map<String, Object> sourceOffset(long j) {
        return Collections.singletonMap("bySeqno", Long.valueOf(j));
    }

    private static Short[] toBoxedShortArray(Collection<String> collection) {
        Short[] shArr = new Short[collection.size()];
        int i = 0;
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            shArr[i2] = Short.valueOf(it.next());
        }
        return shArr;
    }
}
