package com.couchbase.connect.kafka;

import com.couchbase.client.core.env.CouchbaseThreadFactory;
import com.couchbase.client.core.logging.LogRedaction;
import com.couchbase.client.core.logging.RedactionLevel;
import com.couchbase.client.core.util.CbStrings;
import com.couchbase.client.core.util.NanoTimestamp;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.client.dcp.util.PartitionSet;
import com.couchbase.connect.kafka.config.source.CouchbaseSourceTaskConfig;
import com.couchbase.connect.kafka.filter.AllPassFilter;
import com.couchbase.connect.kafka.filter.Filter;
import com.couchbase.connect.kafka.handler.source.CollectionMetadata;
import com.couchbase.connect.kafka.handler.source.CouchbaseHeaderSetter;
import com.couchbase.connect.kafka.handler.source.CouchbaseSourceRecord;
import com.couchbase.connect.kafka.handler.source.DocumentEvent;
import com.couchbase.connect.kafka.handler.source.MultiSourceHandler;
import com.couchbase.connect.kafka.handler.source.SourceHandler;
import com.couchbase.connect.kafka.handler.source.SourceHandlerParams;
import com.couchbase.connect.kafka.handler.source.SourceRecordBuilder;
import com.couchbase.connect.kafka.util.ConnectHelper;
import com.couchbase.connect.kafka.util.FirstCallTracker;
import com.couchbase.connect.kafka.util.JmxHelper;
import com.couchbase.connect.kafka.util.ScopeAndCollection;
import com.couchbase.connect.kafka.util.TopicMap;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.Watchdog;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
import io.micrometer.jmx.JmxMeterRegistry;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.jspecify.annotations.NullMarked;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSourceTask.class */
public class CouchbaseSourceTask extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSourceTask.class);
    private static final long STOP_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
    private static final ThreadFactory cleanupThreadFactory = new CouchbaseThreadFactory("cb-source-task-cleanup-");
    private String connectorName;
    private volatile CouchbaseReader couchbaseReader;
    private BlockingQueue<DocumentChange> queue;
    private BlockingQueue<Throwable> errorQueue;
    private String defaultTopicTemplate;
    private Map<ScopeAndCollection, String> collectionToTopic;
    private String bucket;
    private Filter filter;
    private boolean filterIsNoop;
    private MultiSourceHandler sourceHandler;
    private CouchbaseHeaderSetter headerSetter;
    private int batchSizeMax;
    private boolean connectorNameInOffsets;
    private boolean noValue;
    private SourceDocumentLifecycle lifecycle;
    private volatile MeterRegistry meterRegistry;
    private Counter filteredCounter;
    private Timer handlerTimer;
    private Timer filterTimer;
    private Timer timeBetweenPollsTimer;
    private NanoTimestamp endOfLastPoll;
    private Optional<String> blackHoleTopic;
    private Optional<String> intialOffsetTopic;
    private final CountDownLatch startupComplete = new CountDownLatch(1);
    private final SourceTaskLifecycle taskLifecycle = new SourceTaskLifecycle();
    private final Watchdog watchdog = new Watchdog(this.taskLifecycle.taskUuid());
    private final FirstCallTracker start = new FirstCallTracker();
    private final FirstCallTracker cleanup = new FirstCallTracker();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.connect.kafka.CouchbaseSourceTask$2, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSourceTask$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$couchbase$client$core$logging$RedactionLevel = new int[RedactionLevel.values().length];

        static {
            try {
                $SwitchMap$com$couchbase$client$core$logging$RedactionLevel[RedactionLevel.FULL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$couchbase$client$core$logging$RedactionLevel[RedactionLevel.NONE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$couchbase$client$core$logging$RedactionLevel[RedactionLevel.PARTIAL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSourceTask$ConversionResult.class */
    public static class ConversionResult {
        public final List<SourceRecord> records;
        public final int published;
        public final int dropped;
        public final int synthetic;

        public ConversionResult(List<SourceRecord> list, int i, int i2, int i3) {
            this.records = list;
            this.published = i;
            this.dropped = i2;
            this.synthetic = i3;
        }
    }

    private String taskUuid() {
        return this.taskLifecycle.taskUuid();
    }

    public String version() {
        return Version.getVersion();
    }

    public void initialize(SourceTaskContext sourceTaskContext) {
        super.initialize(sourceTaskContext);
        this.taskLifecycle.logTaskInitialized((String) sourceTaskContext.configs().get("name"));
    }

    public void commit() throws InterruptedException {
        super.commit();
        this.taskLifecycle.logOffsetCommitHook();
    }

    public void start(Map<String, String> map) {
        try {
            if (this.start.alreadyCalled()) {
                throw new IllegalStateException("This source task's start() method has already been called; this violates an important assumption about how the Kafka Connect framework manages the SourceTask lifecycle. taskUuid=" + taskUuid());
            }
            try {
                this.watchdog.start();
                this.connectorName = map.get("name");
                try {
                    CouchbaseSourceTaskConfig couchbaseSourceTaskConfig = (CouchbaseSourceTaskConfig) ConfigHelper.parse(CouchbaseSourceTaskConfig.class, map);
                    if (CbStrings.isNullOrEmpty(this.connectorName)) {
                        throw new ConfigException("Connector must have a non-blank 'name' config property.");
                    }
                    String orElse = ConnectHelper.getTaskIdFromLoggingContext().orElse(couchbaseSourceTaskConfig.maybeTaskId());
                    this.meterRegistry = newMeterRegistry(this.connectorName, orElse, couchbaseSourceTaskConfig);
                    this.handlerTimer = this.meterRegistry.timer("handler", new String[0]);
                    this.filterTimer = this.meterRegistry.timer("filter", new String[0]);
                    this.filteredCounter = this.meterRegistry.counter("filtered.out", new String[0]);
                    this.timeBetweenPollsTimer = this.meterRegistry.timer("time.between.polls", new String[0]);
                    LogRedaction.setRedactionLevel(couchbaseSourceTaskConfig.logRedaction());
                    com.couchbase.client.dcp.core.logging.RedactionLevel.set(toDcp(couchbaseSourceTaskConfig.logRedaction()));
                    Map<String, String> unmodifiableMap = Collections.unmodifiableMap(map);
                    this.lifecycle = SourceDocumentLifecycle.create(taskUuid(), couchbaseSourceTaskConfig);
                    this.filter = (Filter) Utils.newInstance(couchbaseSourceTaskConfig.eventFilter());
                    this.filter.init(unmodifiableMap);
                    this.filterIsNoop = this.filter.getClass().equals(AllPassFilter.class);
                    this.sourceHandler = createSourceHandler(couchbaseSourceTaskConfig);
                    this.sourceHandler.init(unmodifiableMap);
                    this.headerSetter = new CouchbaseHeaderSetter(couchbaseSourceTaskConfig.headerNamePrefix(), couchbaseSourceTaskConfig.headers());
                    this.blackHoleTopic = Optional.ofNullable(CbStrings.emptyToNull(couchbaseSourceTaskConfig.blackHoleTopic().trim()));
                    this.intialOffsetTopic = Optional.ofNullable(CbStrings.emptyToNull(couchbaseSourceTaskConfig.initialOffsetTopic().trim()));
                    this.defaultTopicTemplate = couchbaseSourceTaskConfig.topic();
                    this.collectionToTopic = TopicMap.parseCollectionToTopic(couchbaseSourceTaskConfig.collectionToTopic());
                    this.bucket = couchbaseSourceTaskConfig.bucket();
                    this.connectorNameInOffsets = couchbaseSourceTaskConfig.connectorNameInOffsets();
                    this.batchSizeMax = couchbaseSourceTaskConfig.batchSizeMax();
                    this.noValue = couchbaseSourceTaskConfig.noValue();
                    PartitionSet parse = PartitionSet.parse(couchbaseSourceTaskConfig.partitions());
                    this.taskLifecycle.logTaskStarted(this.connectorName, parse);
                    List list = parse.toList();
                    Map<Integer, SourceOffset> readSourceOffsets = readSourceOffsets(list);
                    HashSet hashSet = new HashSet(list);
                    hashSet.removeAll(readSourceOffsets.keySet());
                    this.taskLifecycle.logSourceOffsetsRead(readSourceOffsets, PartitionSet.from(hashSet));
                    this.queue = new LinkedBlockingQueue();
                    this.errorQueue = new LinkedBlockingQueue(1);
                    this.couchbaseReader = new CouchbaseReader(couchbaseSourceTaskConfig, this.connectorName, orElse, this.queue, this.errorQueue, list, readSourceOffsets, this.lifecycle, this.meterRegistry, this.intialOffsetTopic.isPresent(), this.taskLifecycle);
                    this.couchbaseReader.start();
                    this.endOfLastPoll = NanoTimestamp.now();
                    this.watchdog.enterState("started");
                    this.startupComplete.countDown();
                } catch (ConfigException e) {
                    throw new ConnectException("Couldn't start CouchbaseSourceTask due to configuration error", e);
                }
            } catch (Exception e2) {
                LOGGER.info("Scheduling cleanup because task failed to start. taskUuid={}", taskUuid(), e2);
                this.startupComplete.countDown();
                cleanup();
                throw e2;
            }
        } catch (Throwable th) {
            this.startupComplete.countDown();
            throw th;
        }
    }

    private static MeterRegistry newMeterRegistry(String str, String str2, CouchbaseSourceTaskConfig couchbaseSourceTaskConfig) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("connector", ObjectName.quote(str));
        linkedHashMap.put("task", str2);
        JmxMeterRegistry newJmxMeterRegistry = JmxHelper.newJmxMeterRegistry("kafka.connect.couchbase", linkedHashMap);
        CompositeMeterRegistry compositeMeterRegistry = new CompositeMeterRegistry();
        compositeMeterRegistry.add(newJmxMeterRegistry);
        Optional ofNullable = Optional.ofNullable(newLoggingMeterRegistry(couchbaseSourceTaskConfig));
        Objects.requireNonNull(compositeMeterRegistry);
        ofNullable.ifPresent(compositeMeterRegistry::add);
        return compositeMeterRegistry;
    }

    private static MeterRegistry newLoggingMeterRegistry(CouchbaseSourceTaskConfig couchbaseSourceTaskConfig) {
        Duration metricsInterval = couchbaseSourceTaskConfig.metricsInterval();
        String keyName = ConfigHelper.keyName(CouchbaseSourceTaskConfig.class, (v0) -> {
            v0.metricsInterval();
        });
        if (metricsInterval.isZero()) {
            LOGGER.info("Metrics logging is disabled because config property '" + keyName + "' is set to 0.");
            return null;
        }
        Logger logger = LoggerFactory.getLogger("com.couchbase.connect.kafka.metrics");
        LOGGER.info("Will log metrics to logging category 'com.couchbase.connect.kafka.metrics' at interval: " + metricsInterval);
        LoggingMeterRegistry.Builder builder = LoggingMeterRegistry.builder(str -> {
            if ("logging.step".equals(str)) {
                return metricsInterval.toMillis() + "ms";
            }
            return null;
        });
        Objects.requireNonNull(logger);
        return builder.loggingSink(logger::info).build();
    }

    private com.couchbase.client.dcp.core.logging.RedactionLevel toDcp(RedactionLevel redactionLevel) {
        switch (AnonymousClass2.$SwitchMap$com$couchbase$client$core$logging$RedactionLevel[redactionLevel.ordinal()]) {
            case 1:
                return com.couchbase.client.dcp.core.logging.RedactionLevel.FULL;
            case 2:
                return com.couchbase.client.dcp.core.logging.RedactionLevel.NONE;
            case 3:
                return com.couchbase.client.dcp.core.logging.RedactionLevel.PARTIAL;
            default:
                throw new IllegalArgumentException("Unrecognized redaction level: " + redactionLevel);
        }
    }

    public void stop() {
        this.taskLifecycle.logTaskStopped();
        LOGGER.info("Scheduling cleanup because task was asked to stop. taskUuid={}", taskUuid());
        cleanup();
    }

    public List<SourceRecord> poll() throws InterruptedException {
        this.timeBetweenPollsTimer.record(this.endOfLastPoll.elapsed());
        this.watchdog.enterState("polling");
        try {
            try {
                checkErrorQueue();
                DocumentChange poll = this.queue.poll(1L, TimeUnit.SECONDS);
                if (poll == null) {
                    LOGGER.debug("Poll returns 0 results; taskUuid={}", taskUuid());
                    this.watchdog.enterState("waiting for next poll (after 0 records)");
                    this.endOfLastPoll = NanoTimestamp.now();
                    return null;
                }
                ArrayList arrayList = new ArrayList();
                try {
                    this.watchdog.enterState("draining queue");
                    arrayList.add(poll);
                    this.queue.drainTo(arrayList, this.batchSizeMax - 1);
                    this.watchdog.enterState("converting to source records (" + arrayList.size() + " events)");
                    ConversionResult convertToSourceRecords = convertToSourceRecords(arrayList);
                    this.filteredCounter.increment(convertToSourceRecords.dropped);
                    if (convertToSourceRecords.synthetic > 0) {
                        LOGGER.info("Poll returns {} result(s) ({} synthetic; filtered out {}); taskUuid={}", new Object[]{Integer.valueOf(convertToSourceRecords.published + convertToSourceRecords.synthetic), Integer.valueOf(convertToSourceRecords.synthetic), Integer.valueOf(convertToSourceRecords.dropped), taskUuid()});
                    } else {
                        LOGGER.info("Poll returns {} result(s) (filtered out {}); taskUuid={}", new Object[]{Integer.valueOf(convertToSourceRecords.published), Integer.valueOf(convertToSourceRecords.dropped), taskUuid()});
                    }
                    this.watchdog.enterState("waiting for next poll (after " + convertToSourceRecords.records.size() + " records)");
                    List<SourceRecord> list = convertToSourceRecords.records;
                    arrayList.forEach((v0) -> {
                        v0.flowControlAck();
                    });
                    this.endOfLastPoll = NanoTimestamp.now();
                    return list;
                } catch (Throwable th) {
                    arrayList.forEach((v0) -> {
                        v0.flowControlAck();
                    });
                    throw th;
                }
            } catch (Throwable th2) {
                this.watchdog.enterState("polling reported error: " + th2);
                throw th2;
            }
        } catch (Throwable th3) {
            this.endOfLastPoll = NanoTimestamp.now();
            throw th3;
        }
    }

    private boolean isSourceOffsetUpdate(SourceRecord sourceRecord) {
        return this.blackHoleTopic.isPresent() && this.blackHoleTopic.get().equals(sourceRecord.topic());
    }

    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
        if (sourceRecord instanceof CouchbaseSourceRecord) {
            CouchbaseSourceRecord couchbaseSourceRecord = (CouchbaseSourceRecord) sourceRecord;
            if (isSourceOffsetUpdate(couchbaseSourceRecord)) {
                this.lifecycle.logSourceOffsetUpdateCommittedToBlackHoleTopic(couchbaseSourceRecord, recordMetadata);
            } else {
                this.lifecycle.logCommittedToKafkaTopic(couchbaseSourceRecord, recordMetadata);
            }
        } else {
            LOGGER.warn("Committed a record we didn't create? Record key {}; taskUuid={}", sourceRecord.key(), taskUuid());
        }
        this.sourceHandler.onRecordCommitted(sourceRecord, recordMetadata);
    }

    private void checkErrorQueue() throws ConnectException {
        Throwable poll = this.errorQueue.poll();
        if (poll != null) {
            throw new ConnectException(poll);
        }
    }

    private String getDefaultTopic(DocumentEvent documentEvent) {
        CollectionMetadata collectionMetadata = documentEvent.collectionMetadata();
        return this.defaultTopicTemplate.replace("${bucket}", this.bucket).replace("${scope}", collectionMetadata.scopeName()).replace("${collection}", collectionMetadata.collectionName()).replace("%", "_");
    }

    private ConversionResult convertToSourceRecords(List<DocumentChange> list) {
        ArrayList arrayList = new ArrayList(list.size());
        int i = 0;
        int i2 = 0;
        for (DocumentChange documentChange : list) {
            DocumentEvent create = DocumentEvent.create(documentChange, this.bucket);
            if (CouchbaseReader.isSyntheticInitialOffsetTombstone(documentChange) && this.intialOffsetTopic.isPresent()) {
                SourceRecord createSourceOffsetUpdateRecord = createSourceOffsetUpdateRecord(documentChange.getKey(), this.intialOffsetTopic.get(), documentChange);
                this.lifecycle.logConvertedToKafkaRecord(documentChange, createSourceOffsetUpdateRecord);
                arrayList.add(createSourceOffsetUpdateRecord);
                i2++;
            } else if (this.filterIsNoop || this.filterTimer.record(() -> {
                return this.filter.pass(create);
            })) {
                List<CouchbaseSourceRecord> convertToSourceRecords = convertToSourceRecords(documentChange, create);
                if (convertToSourceRecords.isEmpty()) {
                    this.lifecycle.logSkippedBecauseHandlerSaysIgnore(documentChange);
                    i++;
                    this.blackHoleTopic.ifPresent(str -> {
                        arrayList.add(createSourceOffsetUpdateRecord(str, documentChange));
                    });
                } else {
                    convertToSourceRecords.forEach(couchbaseSourceRecord -> {
                        this.lifecycle.logConvertedToKafkaRecord(documentChange, couchbaseSourceRecord);
                    });
                    arrayList.addAll(convertToSourceRecords);
                }
            } else {
                this.lifecycle.logSkippedBecauseFilterSaysIgnore(documentChange);
                i++;
                this.blackHoleTopic.ifPresent(str2 -> {
                    arrayList.add(createSourceOffsetUpdateRecord(str2, documentChange));
                });
            }
        }
        int size = arrayList.size() - i2;
        if (this.blackHoleTopic.isPresent()) {
            size -= i;
        }
        return new ConversionResult(arrayList, size, i, i2);
    }

    private SourceRecord createSourceOffsetUpdateRecord(String str, DocumentChange documentChange) {
        return createSourceOffsetUpdateRecord("ignored-" + documentChange.getVbucket(), str, documentChange);
    }

    private SourceRecord createSourceOffsetUpdateRecord(String str, String str2, DocumentChange documentChange) {
        return new SourceRecordBuilder().key(str).build(documentChange, sourcePartition(documentChange.getVbucket()), sourceOffset(documentChange), str2);
    }

    private List<CouchbaseSourceRecord> convertToSourceRecords(DocumentChange documentChange, DocumentEvent documentEvent) {
        String orDefault = this.collectionToTopic.getOrDefault(scopeAndCollection(documentEvent), getDefaultTopic(documentEvent));
        List<SourceRecordBuilder> list = (List) this.handlerTimer.record(() -> {
            return this.sourceHandler.convertToSourceRecords(new SourceHandlerParams(documentEvent, orDefault, this.noValue));
        });
        Objects.requireNonNull(list, "The source handler's convertToSourceRecords() method returned null instead of a List; this is forbidden.");
        if (list.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(list.size());
        for (SourceRecordBuilder sourceRecordBuilder : list) {
            Objects.requireNonNull(sourceRecordBuilder, "The source handler's convertToSourceRecords() method returned a list containing a null item; this is forbidden.");
            this.headerSetter.setHeaders(sourceRecordBuilder.headers(), documentEvent);
            arrayList.add(sourceRecordBuilder.build(documentChange, sourcePartition(documentEvent.partition()), sourceOffset(documentChange), orDefault));
        }
        return arrayList;
    }

    private void cleanup() {
        if (this.cleanup.alreadyCalled()) {
            LOGGER.info("Ignoring redundant cleanup request; taskUuid={}", taskUuid());
        } else {
            cleanupThreadFactory.newThread(() -> {
                try {
                    Thread.currentThread().setName(Thread.currentThread().getName() + "-" + taskUuid());
                    if (this.startupComplete.getCount() != 0) {
                        LOGGER.info("Task was asked to stop before it finished starting; deferring cleanup until start() completes. taskUuid={}", taskUuid());
                        Duration ofMinutes = Duration.ofMinutes(30L);
                        if (!this.startupComplete.await(ofMinutes.toMillis(), TimeUnit.MILLISECONDS)) {
                            LOGGER.error("This task's start() method did not complete within the safeguard timeout of {}. Making a last-ditch effort to clean up. taskUuid={}", ofMinutes, taskUuid());
                        }
                    }
                    LOGGER.info("Cleaning up now; taskUuid={}", taskUuid());
                    this.watchdog.stop();
                    if (this.meterRegistry != null) {
                        this.meterRegistry.close();
                    }
                    if (this.couchbaseReader != null) {
                        this.couchbaseReader.shutdown();
                        try {
                            this.couchbaseReader.join(STOP_TIMEOUT_MILLIS);
                            if (this.couchbaseReader.isAlive()) {
                                LOGGER.error("Reader thread is still alive after shutdown request.");
                            }
                        } catch (InterruptedException e) {
                            LOGGER.error("Interrupted while joining reader thread.", e);
                        }
                    }
                } catch (Throwable th) {
                    LOGGER.error("Error while cleaning up resources; taskUuid={}", taskUuid(), th);
                } finally {
                    this.taskLifecycle.logTaskCleanupComplete();
                }
            }).start();
        }
    }

    private Map<Integer, SourceOffset> readSourceOffsets(Collection<Integer> collection) {
        Map offsets = this.context.offsetStorageReader().offsets(sourcePartitions(collection));
        LOGGER.debug("Raw source offsets: {}; taskUuid={}", offsets, taskUuid());
        HashSet hashSet = new HashSet(collection);
        TreeMap treeMap = new TreeMap();
        offsets.forEach((map, map2) -> {
            int parseInt = Integer.parseInt((String) map.get("partition"));
            hashSet.remove(Integer.valueOf(parseInt));
            if (map2 != null) {
                treeMap.put(Integer.valueOf(parseInt), SourceOffset.fromMap(map2));
            }
        });
        if (!hashSet.isEmpty()) {
            LOGGER.error("Offset storage reader returned no information about these partitions: {}; taskUuid={}", PartitionSet.from(hashSet), taskUuid());
        }
        return treeMap;
    }

    private List<Map<String, Object>> sourcePartitions(Collection<Integer> collection) {
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(sourcePartition(it.next().intValue()));
        }
        return arrayList;
    }

    private Map<String, Object> sourcePartition(int i) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("bucket", this.bucket);
        hashMap.put("partition", String.valueOf(i));
        if (this.connectorNameInOffsets) {
            hashMap.put("connector", this.connectorName);
        }
        return hashMap;
    }

    private static Map<String, Object> sourceOffset(DocumentChange documentChange) {
        return new SourceOffset(documentChange.getOffset()).toMap();
    }

    private static ScopeAndCollection scopeAndCollection(DocumentEvent documentEvent) {
        CollectionMetadata collectionMetadata = documentEvent.collectionMetadata();
        return new ScopeAndCollection(collectionMetadata.scopeName(), collectionMetadata.collectionName());
    }

    @NullMarked
    private static MultiSourceHandler createSourceHandler(CouchbaseSourceTaskConfig couchbaseSourceTaskConfig) {
        Object newInstance = Utils.newInstance(couchbaseSourceTaskConfig.sourceHandler());
        if (newInstance instanceof MultiSourceHandler) {
            return (MultiSourceHandler) newInstance;
        }
        if (newInstance instanceof SourceHandler) {
            final SourceHandler sourceHandler = (SourceHandler) newInstance;
            return new MultiSourceHandler() { // from class: com.couchbase.connect.kafka.CouchbaseSourceTask.1
                @Override // com.couchbase.connect.kafka.handler.source.MultiSourceHandler
                public void init(Map<String, String> map) {
                    SourceHandler.this.init(map);
                }

                @Override // com.couchbase.connect.kafka.handler.source.MultiSourceHandler
                public List<SourceRecordBuilder> convertToSourceRecords(SourceHandlerParams sourceHandlerParams) {
                    SourceRecordBuilder handle = SourceHandler.this.handle(sourceHandlerParams);
                    return handle == null ? Collections.emptyList() : Collections.singletonList(handle);
                }

                @Override // com.couchbase.connect.kafka.handler.source.MultiSourceHandler
                public void onRecordCommitted(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
                    SourceHandler.this.onRecordCommitted(sourceRecord, recordMetadata);
                }
            };
        }
        throw new ConfigException("Invalid value for connector config property '" + ConfigHelper.keyName(CouchbaseSourceTaskConfig.class, (v0) -> {
            v0.sourceHandler();
        }) + "' ; Source handler must be an instance of " + SourceHandler.class.getName() + " or " + MultiSourceHandler.class.getName() + ", but got: " + newInstance.getClass().getName());
    }
}
