package org.apache.pulsar.io.kafka.connect;

import com.google.common.base.Preconditions;
import io.confluent.connect.avro.AvroConverter;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-3.2.1.jar:org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.class */
public abstract class AbstractKafkaConnectSource<T> implements Source<T> {
    private SourceTaskContext sourceTaskContext;
    private SourceConnector connector;
    private SourceTask sourceTask;
    public Converter keyConverter;
    public Converter valueConverter;
    private CompletableFuture<Void> flushFuture;
    private OffsetBackingStore offsetStore;
    private OffsetStorageReader offsetReader;
    private String topicNamespace;
    public OffsetStorageWriter offsetWriter;
    public static final String CONNECTOR_CLASS = "kafkaConnectorSourceClass";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractKafkaConnectSource.class);
    private static final Map<String, String> PROPERTIES = Collections.emptyMap();
    private static final Optional<Long> RECORD_SEQUENCE = Optional.empty();
    private Iterator<SourceRecord> currentBatch = null;
    private final AtomicInteger outstandingRecords = new AtomicInteger(0);

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-3.2.1.jar:org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource$AbstractKafkaSourceRecord.class */
    public abstract class AbstractKafkaSourceRecord<T> implements Record {
        Optional<String> key;
        T value;
        Optional<String> topicName;
        Optional<Long> eventTime;
        Optional<String> partitionId;
        Optional<String> destinationTopic;
        Optional<Integer> partitionIndex;
        KafkaSchemaWrappedSchema keySchema;
        KafkaSchemaWrappedSchema valueSchema;

        /* JADX INFO: Access modifiers changed from: package-private */
        public AbstractKafkaSourceRecord(SourceRecord sourceRecord) {
            this.destinationTopic = Optional.of("persistent://" + AbstractKafkaConnectSource.this.topicNamespace + "/" + sourceRecord.topic());
            this.partitionIndex = Optional.ofNullable(sourceRecord.kafkaPartition());
        }

        public Optional<Long> getRecordSequence() {
            return AbstractKafkaConnectSource.RECORD_SEQUENCE;
        }

        public Map<String, String> getProperties() {
            return AbstractKafkaConnectSource.PROPERTIES;
        }

        public boolean isEmpty() {
            return this.value == null;
        }

        private void completedFlushOffset(Throwable th, Void r8) {
            if (th != null) {
                AbstractKafkaConnectSource.log.error("Failed to flush offsets to storage: ", th);
                AbstractKafkaConnectSource.this.currentBatch = null;
                AbstractKafkaConnectSource.this.offsetWriter.cancelFlush();
                AbstractKafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
                return;
            }
            try {
                AbstractKafkaConnectSource.this.sourceTask.commit();
                AbstractKafkaConnectSource.log.info("Finished flushing offsets to storage");
                AbstractKafkaConnectSource.this.currentBatch = null;
                AbstractKafkaConnectSource.this.flushFuture.complete(null);
            } catch (InterruptedException e) {
                AbstractKafkaConnectSource.log.warn("Flush of {} offsets interrupted, cancelling", this);
                Thread.currentThread().interrupt();
                AbstractKafkaConnectSource.this.offsetWriter.cancelFlush();
                AbstractKafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("Failed to commit offsets", e));
            } catch (Throwable th2) {
                AbstractKafkaConnectSource.log.warn("Flush of {} offsets failed, cancelling", this);
                AbstractKafkaConnectSource.this.offsetWriter.cancelFlush();
                AbstractKafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("Failed to commit offsets", th2));
            }
        }

        public void ack() {
            if (!(AbstractKafkaConnectSource.this.outstandingRecords.decrementAndGet() == 0) || AbstractKafkaConnectSource.this.flushFuture == null) {
                return;
            }
            if (!AbstractKafkaConnectSource.this.offsetWriter.beginFlush()) {
                AbstractKafkaConnectSource.log.error("When beginFlush, No offsets to commit!");
                AbstractKafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("No Offsets Added Error when beginFlush"));
            } else if (AbstractKafkaConnectSource.this.offsetWriter.doFlush(this::completedFlushOffset) == null) {
                AbstractKafkaConnectSource.log.error("No offsets to commit!");
                AbstractKafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
            }
        }

        public void fail() {
            if (AbstractKafkaConnectSource.this.flushFuture != null) {
                AbstractKafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("Sink Error"));
            }
        }

        public Optional<String> getKey() {
            return this.key;
        }

        public T getValue() {
            return this.value;
        }

        public Optional<String> getTopicName() {
            return this.topicName;
        }

        public Optional<Long> getEventTime() {
            return this.eventTime;
        }

        public Optional<String> getPartitionId() {
            return this.partitionId;
        }

        public Optional<String> getDestinationTopic() {
            return this.destinationTopic;
        }

        public Optional<Integer> getPartitionIndex() {
            return this.partitionIndex;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v57, types: [java.util.Map] */
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        HashMap hashMap;
        HashMap hashMap2 = new HashMap();
        map.forEach((str, obj) -> {
            if (obj instanceof String) {
                hashMap2.put(str, (String) obj);
            }
        });
        this.topicNamespace = (String) hashMap2.get(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG);
        this.keyConverter = (Converter) Class.forName((String) hashMap2.get("key.converter")).asSubclass(Converter.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        this.valueConverter = (Converter) Class.forName((String) hashMap2.get("value.converter")).asSubclass(Converter.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        if (this.keyConverter instanceof AvroConverter) {
            this.keyConverter = new AvroConverter(new MockSchemaRegistryClient());
            map.put("schema.registry.url", "mock");
        }
        if (this.valueConverter instanceof AvroConverter) {
            this.valueConverter = new AvroConverter(new MockSchemaRegistryClient());
            map.put("schema.registry.url", "mock");
        }
        this.keyConverter.configure(map, true);
        this.valueConverter.configure(map, false);
        this.offsetStore = new PulsarOffsetBackingStore(sourceContext.getPulsarClient());
        PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(hashMap2);
        this.offsetStore.configure(pulsarKafkaWorkerConfig);
        this.offsetStore.start();
        this.offsetReader = new OffsetStorageReaderImpl(this.offsetStore, "pulsar-kafka-connect-adaptor", this.keyConverter, this.valueConverter);
        this.offsetWriter = new OffsetStorageWriter(this.offsetStore, "pulsar-kafka-connect-adaptor", this.keyConverter, this.valueConverter);
        this.sourceTaskContext = new PulsarIOSourceTaskContext(this.offsetReader, pulsarKafkaWorkerConfig);
        if (map.get(CONNECTOR_CLASS) != null) {
            this.connector = (SourceConnector) Class.forName(map.get(CONNECTOR_CLASS).toString()).getConstructor(new Class[0]).newInstance(new Object[0]);
            this.sourceTask = (SourceTask) this.connector.taskClass().getConstructor(new Class[0]).newInstance(new Object[0]);
            this.connector.initialize(new PulsarKafkaSinkContext());
            this.connector.start(hashMap2);
            List<Map<String, String>> taskConfigs = this.connector.taskConfigs(1);
            Preconditions.checkNotNull(taskConfigs);
            Preconditions.checkArgument(taskConfigs.size() == 1);
            hashMap = (Map) taskConfigs.get(0);
        } else {
            this.sourceTask = (SourceTask) Class.forName((String) hashMap2.get(TaskConfig.TASK_CLASS_CONFIG)).asSubclass(SourceTask.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            hashMap = hashMap2;
        }
        this.sourceTask.initialize(this.sourceTaskContext);
        this.sourceTask.start(hashMap);
    }

    public synchronized Record<T> read() throws Exception {
        while (true) {
            if (this.currentBatch == null) {
                this.flushFuture = new CompletableFuture<>();
                List<SourceRecord> poll = this.sourceTask.poll();
                if (poll == null || poll.isEmpty()) {
                    Thread.sleep(1000L);
                } else {
                    this.outstandingRecords.addAndGet(poll.size());
                    this.currentBatch = poll.iterator();
                }
            }
            if (this.currentBatch.hasNext()) {
                AbstractKafkaConnectSource<T>.AbstractKafkaSourceRecord<T> processSourceRecord2 = processSourceRecord2(this.currentBatch.next());
                if (!processSourceRecord2.isEmpty()) {
                    return processSourceRecord2;
                }
                this.outstandingRecords.decrementAndGet();
            } else {
                try {
                    try {
                        this.flushFuture.get();
                        this.flushFuture = null;
                        this.currentBatch = null;
                    } catch (ExecutionException e) {
                        log.error("execution exception while get flushFuture", (Throwable) e);
                        throw new Exception("Flush failed", e.getCause());
                    }
                } catch (Throwable th) {
                    this.flushFuture = null;
                    this.currentBatch = null;
                    throw th;
                }
            }
        }
    }

    public void close() {
        if (this.sourceTask != null) {
            this.sourceTask.stop();
            this.sourceTask = null;
        }
        if (this.connector != null) {
            this.connector.stop();
            this.connector = null;
        }
        if (this.offsetStore != null) {
            this.offsetStore.stop();
            this.offsetStore = null;
        }
    }

    /* renamed from: processSourceRecord */
    public abstract AbstractKafkaConnectSource<T>.AbstractKafkaSourceRecord<T> processSourceRecord2(SourceRecord sourceRecord);

    public SourceTask getSourceTask() {
        return this.sourceTask;
    }

    public OffsetStorageWriter getOffsetWriter() {
        return this.offsetWriter;
    }
}
