package org.apache.kafka.connect.util;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/util/KafkaBasedLog.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.0-rc-0.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/util/KafkaBasedLog.class */
public class KafkaBasedLog<K, V> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaBasedLog.class);
    private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
    private Time time;
    private final String topic;
    private final Map<String, Object> producerConfigs;
    private final Map<String, Object> consumerConfigs;
    private final Callback<ConsumerRecord<K, V>> consumedCallback;
    private Consumer<K, V> consumer;
    private Producer<K, V> producer;
    private Thread thread;
    private boolean stopRequested = false;
    private Queue<Callback<Void>> readLogEndOffsetCallbacks = new ArrayDeque();
    private Runnable initializer;

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/util/KafkaBasedLog$WorkThread.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.0-rc-0.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/util/KafkaBasedLog$WorkThread.class */
    private class WorkThread extends Thread {
        public WorkThread() {
            super("KafkaBasedLog Work Thread - " + KafkaBasedLog.this.topic);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int size;
            try {
                KafkaBasedLog.log.trace("{} started execution", this);
            } catch (Throwable th) {
                KafkaBasedLog.log.error("Unexpected exception in {}", this, th);
                return;
            }
            while (true) {
                synchronized (KafkaBasedLog.this) {
                    if (KafkaBasedLog.this.stopRequested) {
                        return;
                    } else {
                        size = KafkaBasedLog.this.readLogEndOffsetCallbacks.size();
                    }
                    KafkaBasedLog.log.error("Unexpected exception in {}", this, th);
                    return;
                }
                if (size > 0) {
                    try {
                        KafkaBasedLog.this.readToLogEnd();
                        KafkaBasedLog.log.trace("Finished read to end log for topic {}", KafkaBasedLog.this.topic);
                    } catch (WakeupException e) {
                    }
                }
                synchronized (KafkaBasedLog.this) {
                    for (int i = 0; i < size; i++) {
                        ((Callback) KafkaBasedLog.this.readLogEndOffsetCallbacks.poll()).onCompletion(null, null);
                    }
                }
                try {
                    KafkaBasedLog.this.poll(2147483647L);
                } catch (WakeupException e2) {
                }
            }
        }
    }

    public KafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<K, V>> callback, Time time, Runnable runnable) {
        this.topic = str;
        this.producerConfigs = map;
        this.consumerConfigs = map2;
        this.consumedCallback = callback;
        this.time = time;
        this.initializer = runnable != null ? runnable : new Runnable() { // from class: org.apache.kafka.connect.util.KafkaBasedLog.1
            @Override // java.lang.Runnable
            public void run() {
            }
        };
    }

    public void start() {
        log.info("Starting KafkaBasedLog with topic " + this.topic);
        this.initializer.run();
        this.producer = createProducer();
        this.consumer = createConsumer();
        ArrayList arrayList = new ArrayList();
        List<PartitionInfo> list = null;
        long milliseconds = this.time.milliseconds();
        while (list == null && this.time.milliseconds() - milliseconds < CREATE_TOPIC_TIMEOUT_MS) {
            list = this.consumer.partitionsFor(this.topic);
            Utils.sleep(Math.min(this.time.milliseconds() - milliseconds, 1000L));
        }
        if (list == null) {
            throw new ConnectException("Could not look up partition metadata for offset backing store topic in allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.");
        }
        for (PartitionInfo partitionInfo : list) {
            arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        this.consumer.assign(arrayList);
        this.consumer.seekToBeginning(arrayList);
        readToLogEnd();
        this.thread = new WorkThread();
        this.thread.start();
        log.info("Finished reading KafkaBasedLog for topic " + this.topic);
        log.info("Started KafkaBasedLog for topic " + this.topic);
    }

    public void stop() {
        log.info("Stopping KafkaBasedLog for topic " + this.topic);
        synchronized (this) {
            this.stopRequested = true;
        }
        this.consumer.wakeup();
        try {
            this.thread.join();
            try {
                this.producer.close();
            } catch (KafkaException e) {
                log.error("Failed to stop KafkaBasedLog producer", (Throwable) e);
            }
            try {
                this.consumer.close();
            } catch (KafkaException e2) {
                log.error("Failed to stop KafkaBasedLog consumer", (Throwable) e2);
            }
            log.info("Stopped KafkaBasedLog for topic " + this.topic);
        } catch (InterruptedException e3) {
            throw new ConnectException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting down it's producer and consumer.", e3);
        }
    }

    public void readToEnd(Callback<Void> callback) {
        log.trace("Starting read to end log for topic {}", this.topic);
        this.producer.flush();
        synchronized (this) {
            this.readLogEndOffsetCallbacks.add(callback);
        }
        this.consumer.wakeup();
    }

    public void flush() {
        this.producer.flush();
    }

    public Future<Void> readToEnd() {
        FutureCallback futureCallback = new FutureCallback(null);
        readToEnd(futureCallback);
        return futureCallback;
    }

    public void send(K k, V v) {
        send(k, v, null);
    }

    public void send(K k, V v, org.apache.kafka.clients.producer.Callback callback) {
        this.producer.send(new ProducerRecord<>(this.topic, k, v), callback);
    }

    private Producer<K, V> createProducer() {
        this.producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
        this.producerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        return new KafkaProducer(this.producerConfigs);
    }

    private Consumer<K, V> createConsumer() {
        this.consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        this.consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new KafkaConsumer(this.consumerConfigs);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void poll(long j) {
        try {
            Iterator<ConsumerRecord<K, V>> it = this.consumer.poll(Duration.ofMillis(j)).iterator();
            while (it.hasNext()) {
                this.consumedCallback.onCompletion(null, it.next());
            }
        } catch (WakeupException e) {
            throw e;
        } catch (KafkaException e2) {
            log.error("Error polling: " + e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void readToLogEnd() {
        log.trace("Reading to end of offset log");
        Map<TopicPartition, Long> endOffsets = this.consumer.endOffsets(this.consumer.assignment());
        log.trace("Reading to end of log offsets {}", endOffsets);
        while (!endOffsets.isEmpty()) {
            Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
            while (true) {
                if (it.hasNext()) {
                    Map.Entry<TopicPartition, Long> next = it.next();
                    if (this.consumer.position(next.getKey()) < next.getValue().longValue()) {
                        poll(2147483647L);
                        break;
                    }
                    it.remove();
                }
            }
        }
    }
}
