package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.consumer.internals.AbstractFetch;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

/* loaded from: input_file:WEB-INF/lib/kafka-clients-3.7.1.jar:org/apache/kafka/clients/consumer/internals/Fetcher.class */
public class Fetcher<K, V> extends AbstractFetch {
    private final Logger log;
    private final ConsumerNetworkClient client;
    private final FetchCollector<K, V> fetchCollector;

    public Fetcher(LogContext logContext, ConsumerNetworkClient consumerNetworkClient, ConsumerMetadata consumerMetadata, SubscriptionState subscriptionState, FetchConfig fetchConfig, Deserializers<K, V> deserializers, FetchMetricsManager fetchMetricsManager, Time time, ApiVersions apiVersions) {
        super(logContext, consumerMetadata, subscriptionState, fetchConfig, new FetchBuffer(logContext), fetchMetricsManager, time, apiVersions);
        this.log = logContext.logger(Fetcher.class);
        this.client = consumerNetworkClient;
        this.fetchCollector = new FetchCollector<>(logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time);
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractFetch
    protected boolean isUnavailable(Node node) {
        return this.client.isUnavailable(node);
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractFetch
    protected void maybeThrowAuthFailure(Node node) {
        this.client.maybeThrowAuthFailure(node);
    }

    public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> collection) {
        this.fetchBuffer.retainAll(new HashSet(collection));
    }

    public synchronized int sendFetches() {
        Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests = prepareFetchRequests();
        sendFetchesInternal(prepareFetchRequests, (node, fetchRequestData, clientResponse) -> {
            synchronized (this) {
                handleFetchSuccess(node, fetchRequestData, clientResponse);
            }
        }, (node2, fetchRequestData2, th) -> {
            synchronized (this) {
                handleFetchFailure(node2, fetchRequestData2, th);
            }
        });
        return prepareFetchRequests.size();
    }

    protected void maybeCloseFetchSessions(Timer timer) {
        List<RequestFuture<ClientResponse>> sendFetchesInternal = sendFetchesInternal(prepareCloseFetchSessionRequests(), this::handleCloseFetchSessionSuccess, this::handleCloseFetchSessionFailure);
        while (timer.notExpired() && !sendFetchesInternal.stream().allMatch((v0) -> {
            return v0.isDone();
        })) {
            this.client.poll(timer, (ConsumerNetworkClient.PollCondition) null, true);
            timer.update();
        }
        if (sendFetchesInternal.stream().allMatch((v0) -> {
            return v0.isDone();
        })) {
            return;
        }
        this.log.debug("All requests couldn't be sent in the specific timeout period {}ms. This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for KafkaConsumer.close(Duration timeout)", Long.valueOf(timer.timeoutMs()));
    }

    public Fetch<K, V> collectFetch() {
        return this.fetchCollector.collectFetch(this.fetchBuffer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.clients.consumer.internals.AbstractFetch
    public synchronized void closeInternal(Timer timer) {
        this.client.disableWakeups();
        maybeCloseFetchSessions(timer);
        super.closeInternal(timer);
    }

    private List<RequestFuture<ClientResponse>> sendFetchesInternal(Map<Node, FetchSessionHandler.FetchRequestData> map, final AbstractFetch.ResponseHandler<ClientResponse> responseHandler, final AbstractFetch.ResponseHandler<Throwable> responseHandler2) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : map.entrySet()) {
            final Node key = entry.getKey();
            final FetchSessionHandler.FetchRequestData value = entry.getValue();
            RequestFuture<ClientResponse> send = this.client.send(key, createFetchRequest(key, value));
            send.addListener(new RequestFutureListener<ClientResponse>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.1
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ClientResponse clientResponse) {
                    responseHandler.handle(key, value, clientResponse);
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    responseHandler2.handle(key, value, runtimeException);
                }
            });
            arrayList.add(send);
        }
        return arrayList;
    }
}
