package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.jgroups.protocols.INJECT_VIEW;
import org.slf4j.Logger;

/* loaded from: input_file:META-INF/bundled-dependencies/kafka-clients-3.8.1.jar:org/apache/kafka/clients/consumer/internals/FetchCollector.class */
public class FetchCollector<K, V> {
    private final Logger log;
    private final ConsumerMetadata metadata;
    private final SubscriptionState subscriptions;
    private final FetchConfig fetchConfig;
    private final Deserializers<K, V> deserializers;
    private final FetchMetricsManager metricsManager;
    private final Time time;

    public FetchCollector(LogContext logContext, ConsumerMetadata consumerMetadata, SubscriptionState subscriptionState, FetchConfig fetchConfig, Deserializers<K, V> deserializers, FetchMetricsManager fetchMetricsManager, Time time) {
        this.log = logContext.logger(FetchCollector.class);
        this.metadata = consumerMetadata;
        this.subscriptions = subscriptionState;
        this.fetchConfig = fetchConfig;
        this.deserializers = deserializers;
        this.metricsManager = fetchMetricsManager;
        this.time = time;
    }

    public Fetch<K, V> collectFetch(FetchBuffer fetchBuffer) {
        Fetch<K, V> empty = Fetch.empty();
        ArrayDeque arrayDeque = new ArrayDeque();
        int i = this.fetchConfig.maxPollRecords;
        while (i > 0) {
            try {
                try {
                    CompletedFetch nextInLineFetch = fetchBuffer.nextInLineFetch();
                    if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
                        CompletedFetch peek = fetchBuffer.peek();
                        if (peek == null) {
                            break;
                        }
                        if (peek.isInitialized()) {
                            fetchBuffer.setNextInLineFetch(peek);
                        } else {
                            try {
                                fetchBuffer.setNextInLineFetch(initialize(peek));
                            } catch (Exception e) {
                                if (empty.isEmpty() && FetchResponse.recordsOrFail(peek.partitionData).sizeInBytes() == 0) {
                                    fetchBuffer.poll();
                                }
                                throw e;
                            }
                        }
                        fetchBuffer.poll();
                    } else if (this.subscriptions.isPaused(nextInLineFetch.partition)) {
                        this.log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
                        arrayDeque.add(nextInLineFetch);
                        fetchBuffer.setNextInLineFetch(null);
                    } else {
                        Fetch<K, V> fetchRecords = fetchRecords(nextInLineFetch, i);
                        i -= fetchRecords.numRecords();
                        empty.add(fetchRecords);
                    }
                } catch (KafkaException e2) {
                    if (empty.isEmpty()) {
                        throw e2;
                    }
                    fetchBuffer.addAll(arrayDeque);
                }
            } catch (Throwable th) {
                fetchBuffer.addAll(arrayDeque);
                throw th;
            }
        }
        fetchBuffer.addAll(arrayDeque);
        return empty;
    }

    private Fetch<K, V> fetchRecords(CompletedFetch completedFetch, int i) {
        TopicPartition topicPartition = completedFetch.partition;
        if (!this.subscriptions.isAssigned(topicPartition)) {
            this.log.debug("Not returning fetched records for partition {} since it is no longer assigned", topicPartition);
        } else if (this.subscriptions.isFetchable(topicPartition)) {
            SubscriptionState.FetchPosition position = this.subscriptions.position(topicPartition);
            if (position == null) {
                throw new IllegalStateException("Missing position for fetchable partition " + topicPartition);
            }
            if (completedFetch.nextFetchOffset() == position.offset) {
                List<ConsumerRecord<K, V>> fetchRecords = completedFetch.fetchRecords(this.fetchConfig, this.deserializers, i);
                this.log.trace("Returning {} fetched records at offset {} for assigned partition {}", Integer.valueOf(fetchRecords.size()), position, topicPartition);
                boolean z = false;
                if (completedFetch.nextFetchOffset() > position.offset) {
                    SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(completedFetch.nextFetchOffset(), completedFetch.lastEpoch(), position.currentLeader);
                    this.log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`", position, fetchPosition, topicPartition, Integer.valueOf(fetchRecords.size()));
                    this.subscriptions.position(topicPartition, fetchPosition);
                    z = true;
                }
                Long partitionLag = this.subscriptions.partitionLag(topicPartition, this.fetchConfig.isolationLevel);
                if (partitionLag != null) {
                    this.metricsManager.recordPartitionLag(topicPartition, partitionLag.longValue());
                }
                Long partitionLead = this.subscriptions.partitionLead(topicPartition);
                if (partitionLead != null) {
                    this.metricsManager.recordPartitionLead(topicPartition, partitionLead.longValue());
                }
                return Fetch.forPartition(topicPartition, fetchRecords, z);
            }
            this.log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", topicPartition, Long.valueOf(completedFetch.nextFetchOffset()), position);
        } else {
            this.log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", topicPartition);
        }
        this.log.trace("Draining fetched records for partition {}", topicPartition);
        completedFetch.drain();
        return Fetch.empty();
    }

    protected CompletedFetch initialize(CompletedFetch completedFetch) {
        TopicPartition topicPartition = completedFetch.partition;
        Errors forCode = Errors.forCode(completedFetch.partitionData.errorCode());
        try {
            if (!this.subscriptions.hasValidPosition(topicPartition)) {
                this.log.debug("Ignoring fetched records for partition {} since it no longer has valid position", topicPartition);
                if (1 != 0) {
                    completedFetch.recordAggregatedMetrics(0, 0);
                }
                if (forCode != Errors.NONE) {
                    this.subscriptions.movePartitionToEnd(topicPartition);
                }
                return null;
            }
            if (forCode == Errors.NONE) {
                CompletedFetch handleInitializeSuccess = handleInitializeSuccess(completedFetch);
                if (handleInitializeSuccess == null) {
                    completedFetch.recordAggregatedMetrics(0, 0);
                }
                if (forCode != Errors.NONE) {
                    this.subscriptions.movePartitionToEnd(topicPartition);
                }
                return handleInitializeSuccess;
            }
            handleInitializeErrors(completedFetch, forCode);
            if (1 != 0) {
                completedFetch.recordAggregatedMetrics(0, 0);
            }
            if (forCode != Errors.NONE) {
                this.subscriptions.movePartitionToEnd(topicPartition);
            }
            return null;
        } catch (Throwable th) {
            if (1 != 0) {
                completedFetch.recordAggregatedMetrics(0, 0);
            }
            if (forCode != Errors.NONE) {
                this.subscriptions.movePartitionToEnd(topicPartition);
            }
            throw th;
        }
    }

    private CompletedFetch handleInitializeSuccess(CompletedFetch completedFetch) {
        TopicPartition topicPartition = completedFetch.partition;
        long nextFetchOffset = completedFetch.nextFetchOffset();
        SubscriptionState.FetchPosition positionOrNull = this.subscriptions.positionOrNull(topicPartition);
        if (positionOrNull == null || positionOrNull.offset != nextFetchOffset) {
            this.log.debug("Discarding stale fetch response for partition {} since its offset {} does not match the expected offset {} or the partition has been unassigned", topicPartition, Long.valueOf(nextFetchOffset), positionOrNull);
            return null;
        }
        FetchResponseData.PartitionData partitionData = completedFetch.partitionData;
        this.log.trace("Preparing to read {} bytes of data for partition {} with offset {}", Integer.valueOf(FetchResponse.recordsSize(partitionData)), topicPartition, positionOrNull);
        if (FetchResponse.recordsOrFail(partitionData).batches().iterator().hasNext() || FetchResponse.recordsSize(partitionData) <= 0) {
            if (!updatePartitionState(partitionData, topicPartition)) {
                return null;
            }
            completedFetch.setInitialized();
            return completedFetch;
        }
        if (completedFetch.requestVersion >= 3) {
            throw new KafkaException("Failed to make progress reading messages at " + topicPartition + INJECT_VIEW.VIEW_SEPARATOR + nextFetchOffset + ". Received a non-empty fetch response from the server, but no complete records were found.");
        }
        Map singletonMap = Collections.singletonMap(topicPartition, Long.valueOf(nextFetchOffset));
        throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + singletonMap + " whose size is larger than the fetch size " + this.fetchConfig.fetchSize + " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + DefaultExpressionEngine.DEFAULT_INDEX_END, (Map<TopicPartition, Long>) singletonMap);
    }

    private boolean updatePartitionState(FetchResponseData.PartitionData partitionData, TopicPartition topicPartition) {
        if (partitionData.highWatermark() >= 0) {
            this.log.trace("Updating high watermark for partition {} to {}", topicPartition, Long.valueOf(partitionData.highWatermark()));
            if (!this.subscriptions.tryUpdatingHighWatermark(topicPartition, partitionData.highWatermark())) {
                return false;
            }
        }
        if (partitionData.logStartOffset() >= 0) {
            this.log.trace("Updating log start offset for partition {} to {}", topicPartition, Long.valueOf(partitionData.logStartOffset()));
            if (!this.subscriptions.tryUpdatingLogStartOffset(topicPartition, partitionData.logStartOffset())) {
                return false;
            }
        }
        if (partitionData.lastStableOffset() >= 0) {
            this.log.trace("Updating last stable offset for partition {} to {}", topicPartition, Long.valueOf(partitionData.lastStableOffset()));
            if (!this.subscriptions.tryUpdatingLastStableOffset(topicPartition, partitionData.lastStableOffset())) {
                return false;
            }
        }
        if (FetchResponse.isPreferredReplica(partitionData)) {
            return this.subscriptions.tryUpdatingPreferredReadReplica(topicPartition, partitionData.preferredReadReplica(), () -> {
                long milliseconds = this.time.milliseconds() + this.metadata.metadataExpireMs();
                this.log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}", topicPartition, Integer.valueOf(partitionData.preferredReadReplica()), Long.valueOf(milliseconds));
                return milliseconds;
            });
        }
        return true;
    }

    private void handleInitializeErrors(CompletedFetch completedFetch, Errors errors) {
        TopicPartition topicPartition = completedFetch.partition;
        long nextFetchOffset = completedFetch.nextFetchOffset();
        if (errors == Errors.NOT_LEADER_OR_FOLLOWER || errors == Errors.REPLICA_NOT_AVAILABLE || errors == Errors.KAFKA_STORAGE_ERROR || errors == Errors.FENCED_LEADER_EPOCH || errors == Errors.OFFSET_NOT_AVAILABLE) {
            this.log.debug("Error in fetch for partition {}: {}", topicPartition, errors.exceptionName());
            FetchUtils.requestMetadataUpdate(this.metadata, this.subscriptions, topicPartition);
            return;
        }
        if (errors == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
            this.log.warn("Received unknown topic or partition error in fetch for partition {}", topicPartition);
            FetchUtils.requestMetadataUpdate(this.metadata, this.subscriptions, topicPartition);
            return;
        }
        if (errors == Errors.UNKNOWN_TOPIC_ID) {
            this.log.warn("Received unknown topic ID error in fetch for partition {}", topicPartition);
            FetchUtils.requestMetadataUpdate(this.metadata, this.subscriptions, topicPartition);
            return;
        }
        if (errors == Errors.INCONSISTENT_TOPIC_ID) {
            this.log.warn("Received inconsistent topic ID error in fetch for partition {}", topicPartition);
            FetchUtils.requestMetadataUpdate(this.metadata, this.subscriptions, topicPartition);
            return;
        }
        if (errors != Errors.OFFSET_OUT_OF_RANGE) {
            if (errors == Errors.TOPIC_AUTHORIZATION_FAILED) {
                this.log.warn("Not authorized to read from partition {}.", topicPartition);
                throw new TopicAuthorizationException((Set<String>) Collections.singleton(topicPartition.topic()));
            }
            if (errors == Errors.UNKNOWN_LEADER_EPOCH) {
                this.log.debug("Received unknown leader epoch error in fetch for partition {}", topicPartition);
                return;
            } else if (errors == Errors.UNKNOWN_SERVER_ERROR) {
                this.log.warn("Unknown server error while fetching offset {} for topic-partition {}", Long.valueOf(nextFetchOffset), topicPartition);
                return;
            } else {
                if (errors != Errors.CORRUPT_MESSAGE) {
                    throw new IllegalStateException("Unexpected error code " + ((int) errors.code()) + " while fetching at offset " + nextFetchOffset + " from topic-partition " + topicPartition);
                }
                throw new KafkaException("Encountered corrupt message when fetching offset " + nextFetchOffset + " for topic-partition " + topicPartition);
            }
        }
        Optional<Integer> clearPreferredReadReplica = this.subscriptions.clearPreferredReadReplica(topicPartition);
        if (clearPreferredReadReplica.isPresent()) {
            this.log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", clearPreferredReadReplica.get(), topicPartition, errors, Long.valueOf(nextFetchOffset));
            return;
        }
        SubscriptionState.FetchPosition positionOrNull = this.subscriptions.positionOrNull(topicPartition);
        if (positionOrNull == null || nextFetchOffset != positionOrNull.offset) {
            this.log.debug("Discarding stale fetch response for partition {} since the fetched offset {} does not match the current offset {} or the partition has been unassigned", topicPartition, Long.valueOf(nextFetchOffset), positionOrNull);
            return;
        }
        String str = "Fetch position " + positionOrNull + " is out of range for partition " + topicPartition;
        if (!this.subscriptions.hasDefaultOffsetResetPolicy()) {
            this.log.info("{}, raising error to the application since no reset policy is configured", str);
            throw new OffsetOutOfRangeException(str, Collections.singletonMap(topicPartition, Long.valueOf(positionOrNull.offset)));
        }
        this.log.info("{}, resetting offset", str);
        this.subscriptions.requestOffsetResetIfPartitionAssigned(topicPartition);
    }
}
