package org.apache.hadoop.hive.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.kafkaesqueesque.clients.consumer.Consumer;
import org.apache.kafkaesqueesque.clients.consumer.ConsumerRecord;
import org.apache.kafkaesqueesque.clients.consumer.ConsumerRecords;
import org.apache.kafkaesqueesque.common.TopicPartition;
import org.apache.kafkaesqueesque.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/kafka/KafkaRecordIterator.class */
class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordIterator.class);
    private static final String POLL_TIMEOUT_HINT = String.format("Try increasing poll timeout using Hive Table property [%s]", KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName());
    private static final String ERROR_POLL_TIMEOUT_FORMAT = "Consumer returned [0] record due to exhausted poll timeout [%s]ms from TopicPartition:[%s] start Offset [%s], current consumer position [%s], target end offset [%s], " + POLL_TIMEOUT_HINT;
    private final Consumer<byte[], byte[]> consumer;
    private final TopicPartition topicPartition;
    private final long endOffset;
    private final long startOffset;
    private final long pollTimeoutMs;
    private final Duration pollTimeoutDurationMs;
    private final Stopwatch stopwatch;
    private ConsumerRecords<byte[], byte[]> records;
    private long consumerPosition;
    private ConsumerRecord<byte[], byte[]> nextRecord;
    private boolean hasMore;
    private Iterator<ConsumerRecord<byte[], byte[]>> consumerRecordIterator;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hive/kafka/KafkaRecordIterator$PollTimeoutException.class */
    public static final class PollTimeoutException extends RetriableException {
        private static final long serialVersionUID = 1;

        PollTimeoutException(String str) {
            super(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaRecordIterator(Consumer<byte[], byte[]> consumer, TopicPartition topicPartition, @Nullable Long l, @Nullable Long l2, long j) {
        this.stopwatch = Stopwatch.createUnstarted();
        this.hasMore = true;
        this.consumerRecordIterator = null;
        this.consumer = (Consumer) Preconditions.checkNotNull(consumer, "Consumer can not be null");
        this.topicPartition = (TopicPartition) Preconditions.checkNotNull(topicPartition, "Topic partition can not be null");
        this.pollTimeoutMs = j;
        this.pollTimeoutDurationMs = Duration.ofMillis(j);
        Preconditions.checkState(this.pollTimeoutMs > 0, "Poll timeout has to be positive number");
        List singletonList = Collections.singletonList(topicPartition);
        consumer.assign(singletonList);
        if (l2 == null) {
            consumer.seekToEnd(singletonList);
            this.endOffset = consumer.position(topicPartition);
            LOG.info("End Offset set to [{}]", Long.valueOf(this.endOffset));
        } else {
            this.endOffset = l2.longValue();
        }
        if (l != null) {
            LOG.info("Seeking to offset [{}] of topic partition [{}]", l, topicPartition);
            consumer.seek(topicPartition, l.longValue());
            this.startOffset = consumer.position(topicPartition);
            if (this.startOffset != l.longValue()) {
                LOG.warn("Current Start Offset [{}] is different form the requested start position [{}]", Long.valueOf(this.startOffset), l);
            }
        } else {
            consumer.seekToBeginning(Collections.singleton(topicPartition));
            this.startOffset = consumer.position(topicPartition);
            LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]", topicPartition, Long.valueOf(this.startOffset));
        }
        this.consumerPosition = consumer.position(topicPartition);
        Preconditions.checkState(this.endOffset >= this.consumerPosition, "End offset [%s] need to be greater or equal than start offset [%s]", this.endOffset, this.consumerPosition);
        LOG.info("Kafka Iterator assigned to TopicPartition [{}]; start Offset [{}]; end Offset [{}]", new Object[]{topicPartition, Long.valueOf(this.consumerPosition), Long.valueOf(this.endOffset)});
    }

    @VisibleForTesting
    KafkaRecordIterator(Consumer<byte[], byte[]> consumer, TopicPartition topicPartition, long j) {
        this(consumer, topicPartition, null, null, j);
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        if ((!this.hasMore && this.consumerPosition < this.endOffset) || this.records == null) {
            pollRecords();
            findNext();
        }
        return this.hasMore;
    }

    private void pollRecords() {
        if (LOG.isTraceEnabled()) {
            this.stopwatch.reset().start();
        }
        this.records = this.consumer.poll(this.pollTimeoutDurationMs);
        if (LOG.isTraceEnabled()) {
            this.stopwatch.stop();
            LOG.trace("Pulled [{}] records in [{}] ms", Integer.valueOf(this.records.count()), Long.valueOf(this.stopwatch.elapsed(TimeUnit.MILLISECONDS)));
        }
        if (this.records.isEmpty() && this.consumer.position(this.topicPartition) < this.endOffset) {
            throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT, Long.valueOf(this.pollTimeoutMs), this.topicPartition.toString(), Long.valueOf(this.startOffset), Long.valueOf(this.consumer.position(this.topicPartition)), Long.valueOf(this.endOffset)));
        }
        this.consumerRecordIterator = this.records.iterator();
        this.consumerPosition = this.consumer.position(this.topicPartition);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public ConsumerRecord<byte[], byte[]> next() {
        ConsumerRecord<byte[], byte[]> consumerRecord = this.nextRecord;
        Preconditions.checkState(consumerRecord.offset() < this.endOffset);
        findNext();
        return consumerRecord;
    }

    private void findNext() {
        if (this.consumerRecordIterator.hasNext()) {
            this.nextRecord = this.consumerRecordIterator.next();
            this.hasMore = this.nextRecord.offset() < this.endOffset;
        } else {
            this.hasMore = false;
            this.nextRecord = null;
        }
    }
}
