package com.github.harbby.spark.sql.kafka;

import com.github.harbby.spark.sql.kafka.model.KafkaPartitionOffset;
import java.io.Closeable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import kafka.common.TopicAndPartition;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.base.Preconditions;
import org.spark_project.jetty.util.ConcurrentArrayQueue;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map$;

/* loaded from: input_file:com/github/harbby/spark/sql/kafka/KafkaOffsetCommitter.class */
public class KafkaOffsetCommitter extends Thread implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaOffsetCommitter.class);
    private final KafkaCluster kafkaCluster;
    private final String groupId;
    private final int commitInterval;
    private volatile boolean running = false;
    private final Queue<KafkaPartitionOffset> commitQueue = new ConcurrentArrayQueue(1024);

    public KafkaOffsetCommitter(KafkaCluster kafkaCluster, String str, int i) {
        Preconditions.checkArgument(i >= 5000, "commitInterval must >= 5000");
        this.commitInterval = i;
        this.kafkaCluster = kafkaCluster;
        this.groupId = str;
    }

    @Override // java.lang.Thread
    public synchronized void start() {
        setDaemon(true);
        super.start();
        this.running = true;
    }

    public void addAll(OffsetRange[] offsetRangeArr) {
        if (this.running) {
            for (OffsetRange offsetRange : offsetRangeArr) {
                this.commitQueue.offer(new KafkaPartitionOffset(offsetRange.topicAndPartition(), offsetRange.untilOffset()));
            }
        }
    }

    public void addAll(KafkaPartitionOffset[] kafkaPartitionOffsetArr) {
        if (this.running) {
            this.commitQueue.addAll(Arrays.asList(kafkaPartitionOffsetArr));
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.running = false;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                Thread.sleep(this.commitInterval);
                commitAll();
            } catch (Throwable th) {
                logger.error("The offset committer encountered an error: {}", th.getMessage(), th);
            }
        }
        this.running = false;
    }

    private void commitAll() throws Exception {
        HashMap hashMap = new HashMap();
        KafkaPartitionOffset poll = this.commitQueue.poll();
        while (true) {
            KafkaPartitionOffset kafkaPartitionOffset = poll;
            if (null == kafkaPartitionOffset) {
                break;
            }
            TopicAndPartition topicPartition = kafkaPartitionOffset.getTopicPartition();
            Long l = hashMap.get(topicPartition);
            hashMap.put(topicPartition, Long.valueOf(null == l ? kafkaPartitionOffset.getOffset() : Math.max(l.longValue(), kafkaPartitionOffset.getOffset())));
            poll = this.commitQueue.poll();
        }
        if (hashMap.isEmpty()) {
            return;
        }
        commitKafkaOffsets(hashMap);
    }

    private void commitKafkaOffsets(Map<TopicAndPartition, Long> map) throws Exception {
        logger.info("committing offset to kafka, {}", map);
        this.kafkaCluster.setConsumerOffsets(this.groupId, Map$.MODULE$.apply(((scala.collection.mutable.Map) JavaConverters.mapAsScalaMapConverter(map).asScala()).toSeq()));
    }
}
